领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多我非常兴奋地宣布 Project Reactor 的第一个里程碑版本! Project Reactor 是一个基础框架,用于在 JVM 上构建异步快速数据应用程序。 Reactor 1.0.0.M1 中的一些优点包括:响应式组合助手 Stream 和 Promise、TcpServer 和 TcpClient,以及 Groovy 和 Spring 支持。 受 Reactive Extenstions、RxJava、新的 JDK 8 Stream API(以及 Scala 和其他)的启发,这些 Composables 使协调异步任务变得非常简单。 它们使用 Consumers 支持传统的回调式编程,而且还提供简洁的组合 API,其中包含诸如map(Function fn)
、filter(Predicate
、batch(int size)
等方法。
Reactor 从一开始就被设计成一个高性能、高扩展的平台,用于构建下一代大数据应用程序。 在将应用程序扩展到数百、数千甚至数百万用户时,异步架构明显优于每个请求一个线程的架构。 Reactor 的异步基础为大数据应用程序提供了坚实的基础,这些应用程序每秒处理成千上万甚至数百万个事件。 它提供了将异步任务链接在一起的简单工具,并使执行这些任务就像调用单个方法一样容易。
Streams 是一种简单的方式,用于异步处理数据流经应用程序。 在 Reactor 中,Stream 实际上有两个部分:Deferred(发布者)和实际的 Stream(消费者)。 您可以在 Stream 上分配处理程序,以使用组合方法和简单的回调来处理数据。
在使用 Stream 和 JDK Lambdas 进一步处理之前,转换和过滤进入应用程序的数据,然后将其放入队列中,看起来如下所示
// Create Environment in which Reactors operate
Environment env = new Environment();
// Create a Stream using the high-speed LMAX Disruptor RingBuffer
Deferred<Trade, Stream<Trade>> incoming = Streams.<Trade>defer()
.env(env)
.dispatcher(Environment.RING_BUFFER)
.get();
// Work with the incoming trades
Stream<Trade> trades = incoming.compose();
Stream<Order> orders = trades.map(trade -> tradeService.placeTrade(trade));
// Filter out large orders from small
Stream<Order> highPriority = orders.filter(order -> order.getSize() >= 1000);
Stream<Order> lowPriority = orders.filter(order -> order.getSize() < 1000);
// Consume the orders in different ways
highPriority.consume(order -> orderService.executeNow(order));
lowPriority.consume(order -> orderService.executeLater(order));
M1 还包括一个易于使用的 TCP 客户端和服务器。 在快速的 Netty 网络库的支持下,Reactor 驱动的 syslog 服务器可以在服务器级硬件上每秒接收大约 100 万条消息。 Reactor TCP 支持包括一个简单的 Codec 工具,该工具很容易扩展到核心中提供的默认编解码器集之外,并且设计为轻量级的,使用了 Reactor 的 Buffer 类,该类提供了诸如对数据进行极其有效的视图,以及一系列用于处理标准 Java NIO ByteBuffers 的辅助方法 - 但避免了直接处理 ByteBuffer 的痛苦。
Reactor 的 TCP 支持开箱即用,支持 JSON。 要创建一个使用 JSON 作为协议的基于 TCP 的 RPC 服务器,就像这样简单
TcpServer<Pojo, Pojo> server = new TcpServerSpec<Pojo, Pojo>(NettyTcpServer.class)
.env(env)
.codec(new JsonCodec<>(Pojo.class))
.consume(conn -> {
conn.consume(data -> {
// handle incoming data
});
})
.get()
.start();
Reactor M1 还提供了强大的 Groovy 支持。 它提供了助手来使使用闭包消耗事件非常简洁。 不用说,用 Groovy 编写 Reactor 事件处理非常容易。 使用闭包处理 Reactors 使得异步代码实际上可读!
def env = new Environment()
// Create Reactor using default RingBuffer Dispatcher
def reactor = Reactors.reactor().env(env).get()
reactor.on('topic') { String s ->
// handle data
}
// Publish an event to a topic
r1.notify 'topic', 'Hello World!'
Reactor M1 还包括 Spring 支持,使得编写事件驱动的 POJO 就像 MVC 控制器一样简单。 通过使用 @On
注释方法,可以通过组件扫描获取的 bean 可以自动连接到 Reactor 并被通知事件。
一个简单的基于 JavaConfig 的 Spring 配置可能如下所示
public class HandlerBean {
@On(reactor = "@rootReactor", selector = '$("test")')
public void handleTest() {
// event 'test' was fired
}
}
@Configuration
public class AnnotatedHandlerConfig {
@Bean
public Environment env() {
return new Environment();
}
@Bean
public Reactor rootReactor() {
return env().getRootReactor();
}
}
只需将 Reactor 注入到服务层,当事件准备就绪时,使用 notify()
方法在 Reactor 上发布它们。
Maven 工件可在 SpringSource Artifactory 存储库中找到。 在 Gradle 项目中,您可以这样拉取 Reactor
ext {
reactorVersion = '1.0.0.M1'
}
repositories {
maven { url 'http://repo.springsource.org/libs-milestone' }
mavenCentral()
}
dependencies {
// Reactor Core
compile 'org.projectreactor:reactor-core:$reactorVersion'
}
源代码可在 GitHub 上找到:https://github.com/reactor/reactor
加入 Reactor Google+ 社区 以了解 Reactor 的最新动态,或 在 Twitter 上关注我们 @ProjectReactor。
文档可在 GitHub Wiki 和 API Javadoc 上找到。
您还可以在 GitHub Issues 上提交问题并跟踪开发进度。
我们将在今年的 SpringOne 上进行 关于 Reactor 的完整课程。 如果您还没有计划参加,那么您真的应该参加! 议程中充满了关于 Spring 社区正在做的令人兴奋的事情的精彩课程。 快来加入我们吧!
SpringOne2GX 2013,9 月 9-12 日,加利福尼亚州圣克拉拉
我迫不及待地想投入到下一个 sprint 中,以实现 1.0 GA。 我们很乐意邀请您一起加入!