Reactor 2.5:JVM 的第二代响应式基础

发布 | Stephane Maldini | 2016年2月16日 | ...

进入响应式流时代

Reactor 2.0 的开发始于 2014 年底,与 Reactive Streams 大致在同一时间。我们热衷于加入这项工作,并尽早采用 背压协议,以缓解我们主要的消息传递限制:有界容量。我们在 Reactor 2.0 中首次尝试实现了基于 RingBuffer 的调度器的 Reactive Streams 版本,并衍生出一种越来越流行的响应式模式:Reactive Extensions

与此同时,Reactive Streams 开始获得关注,整个库生态系统都在讨论这一转变。常见的问题是什么?实现 Reactive Streams 语义绝非易事。我们发现越来越需要一个响应式基础来解决消息传递问题并实现通用的流式操作符。因此,我们为 Reactor Core 创建了一个专门的项目空间,并与 Spring Framework 团队一起开始了重点工作。

从 2.5 版本开始,Reactor 现在被 组织成多个项目,例如 2.0.x 等维护分支保持不变。这体现在版本发布管理中,例如 Reactor Core 2.5 M1 是唯一可用的里程碑版本,其他项目将随后发布各自独有的版本。

为了支持这个新的项目模型,我们在 http://projectreactor.io 部署了一个新的网站,希望能更加受欢迎。

协作式地重新审视 Reactive Streams

这种新的组织方式大大降低了参与项目活动的门槛。项目受益于 Spring API 设计方面的协作以及来自 Sébastien DeleuzeBrian Clozel 等人的直接贡献。Reactor 也欢迎新的外部贡献者和评审者的帮助

Reactor 2.5 中的依赖和协作
Reactor 2.5

Reactive Streams Commons

Reactive Streams Commons 仓库 是一个开放的研究项目,致力于提高 Reactive Extensions 等在 Reactive Streams 规范 中的效率。它被 Reactor Core 和 Stream 完全内联,作为该项目所关注的众多革新的契约入口。

"RSC" 因此是一个自由形式的项目,类似于 JCTools 在并发队列方面的做法。其最大的进展之一是一种“融合”(Fusion)协议,旨在减少响应式处理链中大多数同步和部分异步阶段的开销。最后,这项工作帮助修复了一百多个流式错误,我们的测试流程现在包括 RSC 单元/集成测试和 JMH 基准测试,并结合 Reactor 自身的集成测试和 基准测试

Reactor Core 2.5.0.M1

今天的 Reactor 博客系列以一个令人高兴的事件开始:Reactor Core 2.5.0.M1 发布!在其新范围和与 Reactive Streams Commons 的紧密联系下,Reactor Core 提供了足够的 Rx 支持,用于构建响应式应用或库,例如 Spring Reactive Web 支持。对于急切的读者,可以看看 github 上已经提供的 快速入门

快速浏览散射-聚集(scatter-gather)场景

Mono.from(userRequestPublisher)
    .then(userRepository::findUserProfile, 
          userRepository::findUserPaymentMethod)
    .log("user.requests")
    .or(Mono.delay(5)
            .then(n -> Mono.error(new TimeoutException()))
    .mergeWith(userRepository::findSimilarUserDetails)
    .map(userDetailsTuple -> userDetailsTuple.t1.username)
    .publishOn(SchedulerGroup.io())
    .subscribe(responseSubscriber);

详细介绍

  • Flux,一个 发布者 (Publisher),拥有精简的 Rx 作用域,用于表示 0 到 N 个数据信号。操作符包括 create(), interval(), merge(), zip(), concat(), switchOnError()switchOnEmpty()

Flux in action

  • Mono,一个 发布者 (Publisher),拥有精简的 Rx 派生作用域,适用于强类型化这种特定数量(0 或 1 个)的信号。操作符包括 delay(), then(), any(), and(), or(), otherwise(), otherwiseIfEmpty(), where() 以及一个阻塞式的 get()

Mono in action

  • 基于简单的 Java 接口(Runnable, Callable)的新调度契约。

-- 包括 SchedulerGroup, TopicProcessorWorkQueueProcessor。-- 取代了之前的 Enviroment/Dispatcher 组合,同时满足相同的需求,并且将很快提供简单的迁移路径。不再有持有 dispatcher 引用的静态状态。-- 相关操作符:publishOn()dispatchOn()

  • 使用 TestSubscriberPublisher 源提供测试支持。
  • Callable, Runnable, Iterable, Java 8 CompletableFuture, Java 9 Flow.Publisher, RxJava 1 ObservableSingle 转换为兼容 Reactive Streams 的 FluxMono,无需额外的桥接依赖。
  • 完全改版和整合的 Javadoc,包括略微调整的弹珠图 (marble diagrams)。
  • 一个用于构建您自己的响应式组件的微型工具包,包含实用工具和基础 Subscriber,可以随意复用。

-- 一个经济高效的 Timer API 和实现(哈希轮定时器)。-- 新的 Fusion API,可以虚拟地合并响应式链中的两个或多个阶段 -- 一个经过调整的 QueueSupplier,将为正确的容量提供正确的队列

  • 基于 状态 表示的新自省 (Introspection) API。

-- Publisher 日志记录,如果可用,可回退到 java.util.logging 或 SLF4J。可以直接在 FluxMono 上使用 log() 操作符。-- 与包括 Reactive Streams 在内的任何其他契约正交,一切都可以是 Backpressurable,一个 Completable 或是一个生成通用对象(可能是 Subscriber)的 Receiver,这反过来允许我们追踪流的完整图并用状态指标对其进行增强:

下一步计划是什么?

我们非常希望能收集您的反馈,您可以前往相应的 issues 仓库或加入我们最近创建的 Gitter 频道。请继续关注下一篇关于 Reactor Stream 2.5.0.M1 的文章,它是 Reactive Streams 之上的完整 Rx 实现。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

先行一步

VMware 提供培训和认证,助力您的进步。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

即将到来的活动

查看 Spring 社区中所有即将到来的活动。

查看全部