先人一步
VMware 提供培训和认证,助你加速进步。
了解更多Reactor 团队很高兴地宣布,Reactor 灵活、异步、快速数据框架的一些重大更新现已在 Reactor 的 1.1.0.RELEASE 版本中提供。该版本包含大量错误修复和关键组件的重写,使其速度更快,也许更重要的是,在内存使用方面更高效。Reactor 1.1 现在包含了来自高盛的优秀 gs-collections
库 [1],该库为处理各种 map 和 collection 提供了非常流畅的 API。
以下是 Reactor 1.0 和 1.1 之间更改的不完全列表
相对于 1.0 版本,Stream API 中一些更有用的新增功能包括 Stream.window
和 Stream.timeout
方法。这允许你在给定时间内收集值并将它们传递到处理链下游。例如,要处理每 500ms 收集到的值,可以使用 window
Deferred<Pojo, Stream<Pojo>> in = Streams.defer(env);
// add all collected values every half-second
in.compose()
.window(500)
.consume(values -> service.addAll(values));
// another service emits data into the `Deferred`
Pojo p;
while(null != (p = input.next())) {
in.accept(p);
}
RingBuffer
的稳健 HashWheelTimer 实现gs-collections
5.0 的新 Consumer Registry 实现 [1]如果你需要以更可预测的方式控制内存使用,Reactor 包含一个分配 API,该 API 可以由你需要的任何特定池化实现提供支持。Reactor 1.1 提供了两种实现:基于 RingBuffer 的 Allocator
和引用计数 Allocator
。
基于 RingBuffer 的 Allocator
可以配置得非常像带有事件处理程序的标准 Disruptor RingBuffer
。但如果你只需要阻塞生产者并使用基于槽的分配策略,那么使用 RingBuffer 进行分配非常简单
Allocator<Event<Buffer>> pool = new RingBufferAllocatorSpec<Event<Buffer>>()
.ringSize(16 * 1024)
.allocator(() -> new Event<Buffer>(null))
.waitStrategy(new BusySpinWaitStrategy())
.get();
// in your code, maintain a `Reference` you can release
Reference<Event<Buffer>> ref = pool.allocate();
// pass your data POJO to other services
Event<Buffer> ev = ref.get().setData(buffer);
service.invoke(ev);
// when you're done, release the reference
ref.release();
日志记录对异步应用程序的性能可能非常不利——特别是使用 RingBuffer 等技术的应用程序,RingBuffer 使用单个线程支持许多任务。如果该线程被某个执行 IO 写入日志条目的任务阻塞,那么这可能会在应用程序中级联回来,导致其停滞不前。
Reactor 包含一个高效的 Logback 异步 Appender
实现 [2],它将实际的追加操作转移到专用的日志记录线程。这应该有助于减轻大多数应用程序中由日志记录引起的线程压力。但有时这还不够,需要更高吞吐量的解决方案。这时 Reactor 基于 Java Chronicle 的 Appender
就派上用场了。
Java Chronicle [3] 是一个高速消息库,它使用内存映射文件实现快速高效的数据持久化。Reactor 通过提供一个 Appender
将其与 Logback 集成,该 Appender 记录应用程序的原始事件数据,但无需调用下游 appender。这意味着你的日志事件存储在 Chronicle
中,但处于其原始状态。需要一个额外的工具来后处理“持久化”日志文件,然后将这些事件发送到“真实”appender(例如文件或数据库)或查看 Chronicle
并查找与给定模式匹配的条目。这在生产环境中非常有用,当应用程序正常运行时你不需要关心日志记录,但如果出现问题,你可以轻松地从 Chronicle
中提取数据到标准日志文件中进行事后分析。
要配置 Reactor DurableAsyncAppender
以进行高速日志记录,只需在 Logback 配置中声明它即可。以下是在 logback.xml
配置中使用它的示例
<appender name="chronicle" class="reactor.logback.DurableAsyncAppender">
<!-- Uncomment to have log events also sent to a "normal" file appender -->
<!--appender-ref ref="logfile"/-->
<basePath>log/</basePath>
<backlog>2097152</backlog>
</appender>
如果出现问题,你可以使用包含的工具分析 chronicle,将从 chronicle 中提取的事件定向到给定的“真实” Appender
。此示例调用日志工具(reactor-logback.jar
artifact 必须位于 classpath 中),并从 log/
目录读取持久化日志文件,从 logback.xml
读取 Logback 配置,然后将所有 ERROR 消息输出到 logfile
appender,该 appender 在 logback.xml
配置文件中定义。
java reactor.logback.DurableLogUtility --path log/ --config logback.xml --output logfile --level ERROR
Groovy 2.3.0 刚刚发布,包含 大量新功能和性能改进 以及 lambda 闭包支持和其他酷炫的 JDK 8 功能。Reactor 的 Groovy 支持已准备好在 Groovy 2.3 中使用,同时仍然兼容 JDK 7 上的 Groovy 2.2。
reactor-tcp
重命名为 reactor-net
jeromq
添加了 ZeroMQ 支持reconnect
支持对 TCP 模块进行了改进,集成了对 UDP 的支持以及基于 ZeroMQ 的新实现。[4]
Reactor 中的 ZeroMQ 支持具有 tcp
和 inproc
支持,并提供简洁流畅的 API,可使用 Reactor 高效的编解码器工具快速创建客户端和服务器。
ZeroMQ<JsonData> zmq = new ZeroMQ<>(reactorEnv)
.codec(new JacksonJsonCodec());
zmq.router("inproc://queue")
.consume(channel -> channel.consume(service::invoke));
zmq.dealer("inproc://queue")
.consume(channel -> {
JsonData data;
while(null != (data = in.next())) {
channel.sendAndForget(data);
}
});
reactor-benchmark
项目artifacts 可以在 Maven Central 和 repo.spring.io/libs-release
中获取。请注意,Spring 支持的坐标在 1.1 版本中已更改为 org.projectreactor.spring:reactor-spring-*
[6]。
参考文档可在 GitHub wiki 中获取。
更新的 API 文档可在 GitHub pages 站点 上获取。
[1] - https://github.com/goldmansachs/gs-collections
[2] - http://logback.qos.ch/
[3] - https://github.com/OpenHFT/Java-Chronicle
[4] - https://zeromq.cn/
[5] - http://openjdk.java.net/projects/code-tools/jmh/
[6] - http://repo.spring.io/libs-release/org/projectreactor/spring/