Reactor 1.0.0.M1 - JVM 上异步快速数据应用的基础

工程 | Jon Brisbin | 2013年7月18日 | ...

我非常兴奋地宣布 Project Reactor 的第一个里程碑版本! Project Reactor 是一个基础框架,用于在 JVM 上构建异步快速数据应用程序。 Reactor 1.0.0.M1 中的一些优点包括:响应式组合助手 Stream 和 Promise、TcpServer 和 TcpClient,以及 Groovy 和 Spring 支持。 受 Reactive Extenstions、RxJava、新的 JDK 8 Stream API(以及 Scala 和其他)的启发,这些 Composables 使协调异步任务变得非常简单。 它们使用 Consumers 支持传统的回调式编程,而且还提供简洁的组合 API,其中包含诸如map(Function fn)filter(Predicate p)batch(int size) 等方法。

Reactor 解决了什么问题?

Reactor 从一开始就被设计成一个高性能、高扩展的平台,用于构建下一代大数据应用程序。 在将应用程序扩展到数百、数千甚至数百万用户时,异步架构明显优于每个请求一个线程的架构。 Reactor 的异步基础为大数据应用程序提供了坚实的基础,这些应用程序每秒处理成千上万甚至数百万个事件。 它提供了将异步任务链接在一起的简单工具,并使执行这些任务就像调用单个方法一样容易。

使用 Reactor 进行组合

Streams 是一种简单的方式,用于异步处理数据流经应用程序。 在 Reactor 中,Stream 实际上有两个部分:Deferred(发布者)和实际的 Stream(消费者)。 您可以在 Stream 上分配处理程序,以使用组合方法和简单的回调来处理数据。

在使用 Stream 和 JDK Lambdas 进一步处理之前,转换和过滤进入应用程序的数据,然后将其放入队列中,看起来如下所示


// Create Environment in which Reactors operate
Environment env = new Environment();

// Create a Stream using the high-speed LMAX Disruptor RingBuffer
Deferred<Trade, Stream<Trade>> incoming = Streams.<Trade>defer()
		.env(env)
		.dispatcher(Environment.RING_BUFFER)
		.get();

// Work with the incoming trades
Stream<Trade> trades = incoming.compose();
Stream<Order> orders = trades.map(trade -> tradeService.placeTrade(trade));

// Filter out large orders from small
Stream<Order> highPriority = orders.filter(order -> order.getSize() >= 1000);
Stream<Order> lowPriority = orders.filter(order -> order.getSize() < 1000);

// Consume the orders in different ways
highPriority.consume(order -> orderService.executeNow(order));
lowPriority.consume(order -> orderService.executeLater(order));

TCP 支持

M1 还包括一个易于使用的 TCP 客户端和服务器。 在快速的 Netty 网络库的支持下,Reactor 驱动的 syslog 服务器可以在服务器级硬件上每秒接收大约 100 万条消息。 Reactor TCP 支持包括一个简单的 Codec 工具,该工具很容易扩展到核心中提供的默认编解码器集之外,并且设计为轻量级的,使用了 Reactor 的 Buffer 类,该类提供了诸如对数据进行极其有效的视图,以及一系列用于处理标准 Java NIO ByteBuffers 的辅助方法 - 但避免了直接处理 ByteBuffer 的痛苦。

Reactor 的 TCP 支持开箱即用,支持 JSON。 要创建一个使用 JSON 作为协议的基于 TCP 的 RPC 服务器,就像这样简单


TcpServer<Pojo, Pojo> server = new TcpServerSpec<Pojo, Pojo>(NettyTcpServer.class)
		.env(env)
		.codec(new JsonCodec<>(Pojo.class))
		.consume(conn -> {
			conn.consume(data -> {
				// handle incoming data
			});
		})
		.get()
		.start();

Groovy 和 Spring 支持

Reactor M1 还提供了强大的 Groovy 支持。 它提供了助手来使使用闭包消耗事件非常简洁。 不用说,用 Groovy 编写 Reactor 事件处理非常容易。 使用闭包处理 Reactors 使得异步代码实际上可读!


def env = new Environment()

// Create Reactor using default RingBuffer Dispatcher
def reactor = Reactors.reactor().env(env).get()

reactor.on('topic') { String s ->
	// handle data
}

// Publish an event to a topic
r1.notify 'topic', 'Hello World!'

Reactor M1 还包括 Spring 支持,使得编写事件驱动的 POJO 就像 MVC 控制器一样简单。 通过使用 @On 注释方法,可以通过组件扫描获取的 bean 可以自动连接到 Reactor 并被通知事件。

一个简单的基于 JavaConfig 的 Spring 配置可能如下所示


public class HandlerBean {
	@On(reactor = "@rootReactor", selector = '$("test")')
	public void handleTest() {
		// event 'test' was fired
	}
}

@Configuration
public class AnnotatedHandlerConfig {

	@Bean
	public Environment env() {
		return new Environment();
	}

	@Bean
	public Reactor rootReactor() {
		return env().getRootReactor();
	}
}

只需将 Reactor 注入到服务层,当事件准备就绪时,使用 notify() 方法在 Reactor 上发布它们。

工件、源码和文档

Maven 工件可在 SpringSource Artifactory 存储库中找到。 在 Gradle 项目中,您可以这样拉取 Reactor


ext {
  reactorVersion = '1.0.0.M1'
}

repositories {
	maven { url 'http://repo.springsource.org/libs-milestone' }
  mavenCentral()
}

dependencies {
  // Reactor Core
  compile 'org.projectreactor:reactor-core:$reactorVersion'
}

源代码可在 GitHub 上找到:https://github.com/reactor/reactor

加入 Reactor Google+ 社区 以了解 Reactor 的最新动态,或 在 Twitter 上关注我们 @ProjectReactor。

文档可在 GitHub WikiAPI Javadoc 上找到。

您还可以在 GitHub Issues 上提交问题并跟踪开发进度。

来参加 SpringOne!

我们将在今年的 SpringOne 上进行 关于 Reactor 的完整课程。 如果您还没有计划参加,那么您真的应该参加! 议程中充满了关于 Spring 社区正在做的令人兴奋的事情的精彩课程。 快来加入我们吧!

SpringOne2GX 2013,9 月 9-12 日,加利福尼亚州圣克拉拉

我迫不及待地想投入到下一个 sprint 中,以实现 1.0 GA。 我们很乐意邀请您一起加入!

获取 Spring 新闻简报

随时关注 Spring 新闻简报

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部