抢占先机
VMware 提供培训和认证,助您加速发展。
了解更多本月早些时候,我们发布了 Reactor 2020.0 的第一个里程碑版本。这个代号为 Europium
的周期接续了 Dysprosium 周期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。
它包括 reactor-core 3.4.0
和 reactor-netty 1.0.0
。
在这篇博文中,我们将介绍 reactor-core 里程碑版本的一些重点内容,并简要提及 M2 版本的规划。
关于 reactor-netty,我们会在单独的博文发布后立即在此处提供链接。
另请注意,新的版本控制方案已经到位,并已被 Spring 项目组合采纳:请参阅参考指南和这篇博文。
Processor
的变更core 中的主要变化是针对 Reactor 中的 Processor
实现及其暴露方式进行了姗姗来迟的改进。
这是 reactor-core 3.4.0-M1
的主要关注点,目标是逐步淘汰具体 FluxProcessor
变体(以及某种程度上的 MonoProcessor
)的使用。
Processor
是 Reactive Streams 的一个接口,最初旨在表示反应式管道中可在库之间共享的“步骤”。但如今,操作符大多直接实现为 Publisher/Subscriber
对,因此在 Reactor 中,处理器最终涵盖了不同的用例(通常是将一个 Publisher
多播到多个 Subscriber
)。
因此,用户最常将处理器视为一种“手动创建 Flux
”的方式:他们不是将 Processor
连接到父发布者(即将其用作 Subscriber
),而是直接调用其 onNext
/onComplete
/onError
方法。不幸的是,这是一种有问题的做法,因为这些调用**必须**以符合 Reactive Streams 规范的方式进行,这意味着它们需要外部同步。
从历史上看,通过在 FluxProcessor
上引入 sink()
方法缓解了这个问题。其思路是,如果您想以这种手动方式使用 FluxProcessor
,您需要实例化所需的处理器变体,然后**仅调用一次**其 sink()
方法,并从此以后使用生成的 FluxSink
来触发信号给订阅者。在下游,FluxProcessor
本身被暴露出来(作为可在其上组合操作符的 Flux
)。
从可发现性角度来看,这仍然存在问题,因为满足最常见用例的“正确方法”是最难想到的。
借助 3.4.0,我们打算扭转局面,将 Sink
的使用模式作为一流公民放在首位,并使 Processor
的使用模式更难被意外发现或误用。
第一个里程碑通过以下方式迈出了第一步:
FluxProcessor
实现,这些实现计划在 3.5.0
中移除。Sinks
工具类,其中包含用于手动触发 sink 的工厂方法。在 M1 中,处理器的变体仍然存在,但工厂方法已被复制到 Processors
类中,但这已经在 M2 中进行重构。我们打算在 M2 中将变体的选择移到 Sinks
上。届时,将有一种方法可以将 Sink
转换为 FluxProcessor
,从而消除 M2 中对 Processors
的需求。
在 M1
中,所有具体 xxxProcessor
上的工厂方法(例如 UnicastProcessor.create()
)已移至 Processors
用于基本情况,或移至 Processors.more()
用于允许更精细调整的重载方法。这些方法通过前缀区分变体:
UnicastProcessor
-> Processors.unicast()
和 Processors.more().unicast(...)
EmitterProcessor
-> Processors.multicast()
和 Processors.more().multicast(...)
DirectProcessor
-> Processors.more().multicastNoBackpressure()
ReplayProcessor
-> Processors.replayAll()
/replay(int)
/replayTimeout(Duration)
/replaySizeAndTimeout(int, Duration)
以及 Processors.more()
上的类似方法从概念上讲,所有这些处理器都具有相同的输入和输出类型 <T>
,因此它们是 FluxProcessor<T,T>
。M1 中引入了一个便利接口 FluxIdentityProcessor<T>
,但除了减少泛型数量之外并没有带来太多好处,因此它可能会在 M2 中移除。
但我们说过,相对于使用 Processors
中的 FluxProcessor
,人们应该优先使用 Sinks
。在这种情况下,首先会获得一个 sink,并将其转换为 Flux
或 Mono
,供应用程序的其余部分进行组合,如下面的示例所示:
//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();
//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();
//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();
请注意,该类目前提供的变体比 Processors
少,但这正在 M2 中重新考虑。
一些早在 3.3.0 中已被弃用的类已被**移除**:
TopicProcessor
WorkQueueProcessor
Schedulers.boundedElastic()
自 3.3.0 起已可用,我们认为现在可以**弃用**其祖先方法 elastic()
,而不仅仅是建议使用 boundedElastic 而非 elastic。
再往后,在 3.5.0 中,elastic
Scheduler
将被移除。
这里有很多内容需要涵盖,我们将在另一篇博文中进行详细介绍。
请试用 M1 版本!
我们已经在 M2 中对 sink 和 processor 进行了进一步更改,同时还处理了其他主题,如 Context
操作符、避免在 subscribe
中抛出异常以及改进指标方面的内容。
一如既往,非常欢迎对 M1 和当前的 M2 快照版本提出反馈意见。
同时,祝您编码愉快! Reactor 团队。