Reactor 2020.0 (代号 Europium) 的第一个里程碑

工程 | Simon Baslé | July 10, 2020 | ...

本月早些时候,我们发布了 Reactor 2020.0 的第一个里程碑版本。这个代号为 Europium 的周期接续了 Dysprosium 周期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。

它包括 reactor-core 3.4.0 和 reactor-netty 1.0.0

在这篇博文中,我们将介绍 reactor-core 里程碑版本的一些重点内容,并简要提及 M2 版本的规划。

关于 reactor-netty,我们会在单独的博文发布后立即在此处提供链接。

另请注意,新的版本控制方案已经到位,并已被 Spring 项目组合采纳:请参阅参考指南这篇博文

关于 Processor 的变更

core 中的主要变化是针对 Reactor 中的 Processor 实现及其暴露方式进行了姗姗来迟的改进。

这是 reactor-core 3.4.0-M1 的主要关注点,目标是逐步淘汰具体 FluxProcessor 变体(以及某种程度上的 MonoProcessor)的使用。

Processor 是 Reactive Streams 的一个接口,最初旨在表示反应式管道中可在库之间共享的“步骤”。但如今,操作符大多直接实现为 Publisher/Subscriber 对,因此在 Reactor 中,处理器最终涵盖了不同的用例(通常是将一个 Publisher 多播到多个 Subscriber)。

因此,用户最常将处理器视为一种“手动创建 Flux”的方式:他们不是将 Processor 连接到父发布者(即将其用作 Subscriber),而是直接调用其 onNext/onComplete/onError 方法。不幸的是,这是一种有问题的做法,因为这些调用**必须**以符合 Reactive Streams 规范的方式进行,这意味着它们需要外部同步。

从历史上看,通过在 FluxProcessor 上引入 sink() 方法缓解了这个问题。其思路是,如果您想以这种手动方式使用 FluxProcessor,您需要实例化所需的处理器变体,然后**仅调用一次**其 sink() 方法,并从此以后使用生成的 FluxSink 来触发信号给订阅者。在下游,FluxProcessor 本身被暴露出来(作为可在其上组合操作符的 Flux)。

从可发现性角度来看,这仍然存在问题,因为满足最常见用例的“正确方法”是最难想到的。

借助 3.4.0,我们打算扭转局面,将 Sink 的使用模式作为一流公民放在首位,并使 Processor 的使用模式更难被意外发现或误用。

第一个里程碑通过以下方式迈出了第一步:

  • 弃用所有具体的 FluxProcessor 实现,这些实现计划在 3.5.0 中移除。
  • 暴露一个 Sinks 工具类,其中包含用于手动触发 sink 的工厂方法。

在 M1 中,处理器的变体仍然存在,但工厂方法已被复制到 Processors 类中,但这已经在 M2 中进行重构。我们打算在 M2 中将变体的选择移到 Sinks 上。届时,将有一种方法可以将 Sink 转换为 FluxProcessor,从而消除 M2 中对 Processors 的需求。

在 M1 中从具体的 Processor 迁移

M1 中,所有具体 xxxProcessor 上的工厂方法(例如 UnicastProcessor.create())已移至 Processors 用于基本情况,或移至 Processors.more() 用于允许更精细调整的重载方法。这些方法通过前缀区分变体:

  • UnicastProcessor -> Processors.unicast()Processors.more().unicast(...)
  • EmitterProcessor -> Processors.multicast()Processors.more().multicast(...)
  • DirectProcessor -> Processors.more().multicastNoBackpressure()
  • ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及 Processors.more() 上的类似方法

从概念上讲,所有这些处理器都具有相同的输入和输出类型 <T>,因此它们是 FluxProcessor<T,T>。M1 中引入了一个便利接口 FluxIdentityProcessor<T>,但除了减少泛型数量之外并没有带来太多好处,因此它可能会在 M2 中移除。

但我们说过,相对于使用 Processors 中的 FluxProcessor,人们应该优先使用 Sinks。在这种情况下,首先会获得一个 sink,并将其转换为 FluxMono,供应用程序的其余部分进行组合,如下面的示例所示:

//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();

//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();

//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();

请注意,该类目前提供的变体比 Processors 少,但这正在 M2 中重新考虑。

弃用和移除

一些早在 3.3.0 中已被弃用的类已被**移除**:

  • TopicProcessor
  • WorkQueueProcessor

Schedulers.boundedElastic() 自 3.3.0 起已可用,我们认为现在可以**弃用**其祖先方法 elastic(),而不仅仅是建议使用 boundedElastic 而非 elastic。

再往后,在 3.5.0 中,elastic Scheduler 将被移除。

Reactor-Netty 达到 1.0 版本

这里有很多内容需要涵盖,我们将在另一篇博文中进行详细介绍。

总结

请试用 M1 版本!

我们已经在 M2 中对 sink 和 processor 进行了进一步更改,同时还处理了其他主题,如 Context 操作符、避免在 subscribe 中抛出异常以及改进指标方面的内容。

一如既往,非常欢迎对 M1 和当前的 M2 快照版本提出反馈意见。

同时,祝您编码愉快! Reactor 团队。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

抢占先机

VMware 提供培训和认证,助您加速发展。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部