先人一步
VMware 提供培训和认证,助您加速前进。
了解更多在本文中,我们将继续探讨 Reactive 编程系列,并着重通过实际代码示例来解释一些概念。最终目标是让您更好地理解 Reactive 的不同之处以及它为何是函数式的。这里的示例相当抽象,但它们为您提供了一种思考 API 和编程风格的方式,并开始感受它的不同之处。我们将看到 Reactive 的组成元素,并学习如何控制数据流,以及如何在必要时在后台线程中进行处理。
我们将使用 Reactor 库来阐述我们需要说明的要点。代码使用其他工具编写同样容易。如果您想自己动手尝试代码并查看其运行情况,而无需复制粘贴任何内容,可以在 Github 中找到带有测试的示例代码。
要开始,请从 https://start.spring.io 获取一个空白项目并添加 Reactor Core 依赖。使用 Maven
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.0.RC2</version>
</dependency>
使用 Gradle 也非常相似
compile 'io.projectreactor:reactor-core:3.0.0.RC2'
现在我们来编写一些代码。
Reactive 的基本构建块是一个事件序列,以及两个主角:事件的发布者(publisher)和订阅者(subscriber)。将序列称为“流”(stream)也可以,因为它就是流。如果需要,我们将使用小写字母开头的“stream”,但 Java 8 有一个 java.util.Stream
,它有所不同,所以尽量不要混淆。无论如何,我们将尝试把重点放在发布者和订阅者上(Reactive Streams 就是这样做的)。
Reactor 是我们将在示例中使用的库,因此我们将遵循其表示法,并将发布者称为 Flux
(它实现了 Reactive Streams 的 Publisher
接口)。RxJava 库非常相似,并且有很多并行功能,因此在这种情况下,我们将讨论 Observable
,但代码会非常相似。(Reactor 2.0 将其称为 Stream
,如果还需要讨论 Java 8 Streams
会引起混淆,所以我们将只使用 Reactor 3.0 中的新代码。)
Flux
是特定 POJO 类型事件序列的发布者,因此它是泛型的,即 Flux<T>
是 T
的发布者。Flux
提供了一些静态便利方法,可以从多种源创建其自身实例。例如,从数组创建 Flux
Flux<String> flux = Flux.just("red", "white", "blue");
我们刚刚生成了一个 Flux
,现在可以用它做一些事情。实际上,您只能用它做两件事:对其进行操作(转换它或与其他序列组合),以及订阅它(它是一个发布者)。
您经常会遇到已知仅包含一个或零个元素的序列,例如按 ID 查找实体的 repository 方法。Reactor 有一个 Mono
类型,表示单值或空的 Flux
。Mono
的 API 与 Flux
非常相似,但更具针对性,因为并非所有操作符都适用于单值序列。RxJava 也有一个附加组件(在 1.x 版本中)称为 Single
,还有一个用于空序列的 Completable
。Reactor 中的空序列是 Mono<Void>
。
Flux
上有很多方法,几乎所有方法都是操作符。我们不会在这里全部介绍,因为有更好的地方(例如 Javadoc)可以查阅。我们只需要了解操作符是什么以及它能为您做什么。
例如,要请求将 Flux
内部事件记录到标准输出,您可以调用 .log()
方法。或者您可以使用 map()
进行转换
Flux<String> flux = Flux.just("red", "white", "blue");
Flux<String> upper = flux
.log()
.map(String::toUpperCase);
在这段代码中,我们通过将字符串转换为大写来转换输入中的字符串。到目前为止,这很简单。
这个小示例有趣的地方在于——如果您不习惯,甚至会感到震撼——就是目前还没有处理任何数据。甚至没有记录任何内容,因为实际上什么也没发生(试试看就知道了)。在 Flux
上调用操作符相当于构建一个稍后执行的计划。它是完全声明式的,这就是为什么人们称其为“函数式”的原因。操作符中实现的逻辑只在数据开始流动时执行,而这只有当有人订阅了 Flux
(或等效地订阅了 Publisher
)时才会发生。
所有 Reactive 库以及 Java 8 Streams
中都存在这种声明式、函数式处理数据序列的方法。考虑这段看起来相似的代码,使用与 Flux
内容相同的 Stream
Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
System.out.println(value);
return value.toUpperCase();
});
我们对 Flux
的观察在这里也适用:没有数据被处理,它只是一个执行计划。然而,Flux
和 Stream
之间有一些重要的区别,这使得 Stream
不适合 Reactive 用例。Flux
有更多的操作符,其中很多只是为了方便,但真正的区别在于您想要消费数据时,所以这就是我们接下来需要探讨的内容。
提示
Sebastien Deleuze 写了一篇关于 Reactive 类型的有用博客,他在其中通过查看各种流式和响应式 API 定义的类型以及如何使用它们来描述它们之间的差异。其中更详细地强调了 Flux
和 Stream
之间的区别。
要使数据流动,您必须使用其中一个 subscribe()
方法订阅 Flux
。只有这些方法才能使数据流动。它们会沿着您在序列上声明的操作符链(如果存在)回溯,并请求发布者开始创建数据。在我们一直在使用的示例中,这意味着遍历底层字符串集合。在更复杂的用例中,它可能会触发从文件系统读取文件,或者从数据库拉取数据,或者调用 HTTP 服务。
下面是一个 subscribe()
的实际调用
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe();
输出如下
09:17:59.665 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onComplete()
因此我们可以从中看到,不带参数的 subscribe()
的作用是请求发布者发送所有数据——只记录了一个 request()
并且它是“无限制的”(unbounded)。我们还可以看到发布每个项目时的回调(onNext()
),序列结束时的回调(onComplete()
),以及原始订阅时的回调(onSubscribe()
)。如果需要,您可以使用 Flux
中的 doOn*()
方法自己监听这些事件,这些方法本身是操作符,而不是订阅者,所以它们本身不会导致数据流动。
subscribe()
方法是重载的,其他变体为您提供了不同的选项来控制行为。一种重要且方便的形式是带回调函数作为参数的 subscribe()
。第一个参数是一个 Consumer
,它为每个项目提供一个回调;您还可以选择添加一个用于处理错误的 Consumer
(如果发生错误),以及一个在序列完成时执行的普通 Runnable
。例如,只使用逐项回调
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(System.out::println);
输出如下
09:56:12.680 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onComplete()
我们可以通过多种方式控制数据流,使其“有界”(bounded)。原始控制 API 是您从 Subscriber
获得的 Subscription
。上面简短的 subscribe()
调用的等效完整形式是
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
要控制流,例如一次最多消费 2 个项目,您可以更智能地使用 Subscription
.subscribe(new Subscriber<String>() {
private long count = 0;
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(2);
}
@Override
public void onNext(String t) {
count++;
if (count>=2) {
count = 0;
subscription.request(2);
}
}
...
这个 Subscriber
正在“批量”处理项目,每次 2 个。这是一个常见用例,因此您可以考虑将实现提取到一个便利类中,这样也会使代码更具可读性。输出如下
09:47:13.562 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onComplete()
事实上,批量处理订阅者是一个如此常见的用例,以至于 Flux
中已经提供了便利方法。上面的批量处理示例可以这样实现
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(null, 2);
(注意调用 subscribe()
时带有请求限制)。输出如下
10:25:43.739 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onComplete()
提示
像 Spring Reactive Web 这样会为您处理序列的库可以处理订阅。将这些关注点下推到更底层是一个好主意,因为这可以避免用非业务逻辑代码混淆您的代码,使其更具可读性,更容易测试和维护。因此,通常来说,如果可以避免订阅序列,或者至少将该代码推到处理层,使其脱离业务逻辑,这是件好事。
上面所有日志的一个有趣的特点是它们都在“主”线程上,也就是调用 subscribe()
的线程。这突出了一个重要点:Reactor 在线程使用上极其节俭,因为这为您提供了获得最佳性能的最大机会。如果您在过去 5 年里一直在与线程、线程池和异步执行打交道,试图从服务中榨取更多性能,这可能是一个令人惊讶的说法。但这是真的:在没有明确指令要求切换线程的情况下,即使 JVM 经过优化可以非常高效地处理线程,在单个线程上进行计算总是更快。Reactor 已经把控制所有异步处理的钥匙交给了您,并假设您知道自己在做什么。
Flux
提供了一些配置方法来控制线程边界。例如,您可以使用 Flux.subscribeOn()
将订阅配置为在后台线程中处理
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.subscribe(null, 2);
结果可以在输出中看到
13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
提示
如果您自己编写或复制粘贴这段代码,请记住在 JVM 退出之前等待处理停止。
注意,订阅以及所有处理都在一个后台线程“parallel-1-1”上进行——这是因为我们要求主 Flux
的订阅者在后台运行。如果项目处理是 CPU 密集型的,这样做是没问题的(但实际上在后台线程中是无意义的,因为您付出了上下文切换的成本,但并不能更快地获得结果)。您可能还希望执行 I/O 密集型且可能阻塞的项目处理。在这种情况下,您会希望尽快完成它而不会阻塞调用者。线程池仍然是您的朋友,这就是您从 Schedulers.parallel()
获得的。要将单个项目的处理切换到单独的线程(最多达到线程池的限制),我们需要将它们分解成单独的发布者,并为每个发布者请求在后台线程中处理结果。一种方法是使用名为 flatMap()
的操作符,它将项目映射到一个 Publisher
(可能类型不同),然后映射回一个新类型的序列
Flux.just("red", "white", "blue")
.log()
.flatMap(value ->
Mono.just(value.toUpperCase())
.subscribeOn(Schedulers.parallel()),
2)
.subscribe(value -> {
log.info("Consumed: " + value);
})
这里要注意使用 flatMap()
将项目下推到“子”发布者中,我们可以在其中控制每个项目的订阅,而不是整个序列的订阅。Reactor 内置的默认行为是尽可能地保持在单个线程上,因此如果希望它在后台线程中处理特定项目或项目组,我们需要明确指定。实际上,这是强制并行处理的少数公认技巧之一(详情请参阅 Reactive Gems 问题)。
输出如下
15:24:36.596 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
请注意,现在有多个线程正在消费这些项目,并且 flatMap()
中的并发提示确保在任何给定时间都有 2 个项目正在处理,只要它们可用。我们经常看到 request(1)
,因为系统试图在 pipeline 中保持 2 个项目,并且通常它们不会同时完成处理。实际上,Reactor 在这方面非常智能,它会从上游 Publisher
预取项目,以尽量消除订阅者的等待时间(这里看不到这一点,因为数量很小——我们只处理了 3 个项目)。
提示
三个项目(“red”,“white”,“blue”)可能太少,无法令人信服地看到多个后台线程,因此最好生成更多数据。例如,您可以使用随机数生成器来实现。
Flux
还有一个 publishOn()
方法,作用类似,但用于监听器(即 onNext()
或消费者回调),而不是订阅者本身
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
log.info("Consumed: " + value);
});
输出如下
15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
注意,消费者回调(日志记录“Consumed: …”)位于发布者线程 pub-1-1
上。如果您移除 subscribeOn()
调用,您可能会看到第二批数据的所有处理也在 pub-1-1
线程上进行。这再次体现了 Reactor 在线程使用上的节俭——如果没有明确请求切换线程,它会在下一个调用中保持在同一线程上,无论该调用是什么。
注意
我们在此示例中将代码从 subscribe(null, 2)
改为在 publishOn()
中添加 prefetch=2
。在这种情况下,subscribe()
中的抓取大小提示会被忽略。
还有另一种订阅序列的方式,即调用 Mono.block()
或 Mono.toFuture()
或 Flux.toStream()
(这些是“提取器”方法——它们将您从 Reactive 类型中带出,进入一个不太灵活的阻塞抽象)。Flux
还有转换器 collectList()
和 collectMap()
,可以将 Flux
转换为 Mono
。它们实际上并不订阅序列,但它们会丢弃您可能对单个项目的订阅拥有的任何控制权。
警告
一个好的经验法则是“永远不要调用提取器”。也有一些例外情况(否则这些方法就不会存在)。一个值得注意的例外是在测试中,因为能够阻塞以允许结果累积是有用的。
这些方法是作为从 Reactive 到阻塞的桥梁或逃生舱而存在的;例如,如果您需要适配遗留 API,比如 Spring MVC。当您调用 Mono.block()
时,您就丢弃了 Reactive Streams 的所有好处。这是 Reactive Streams 和 Java 8 Streams
之间的关键区别——原生的 Java Stream
只有“全部或无”的订阅模型,相当于 Mono.block()
。当然,subscribe()
也可以阻塞调用线程,所以它与转换方法一样危险,但您有更多的控制权——您可以使用 subscribeOn()
阻止它阻塞,并且可以通过应用背压并定期决定是否继续来逐项推送项目。
在本文中,我们介绍了 Reactive Streams 和 Reactor API 的基础知识。如果您需要了解更多信息,有很多地方可以查找,但没有比动手编程更好的替代方法了,因此请使用 GitHub 中的代码(本文的测试位于名为“flux”的项目中),或前往 Lite RX Hands On 研讨会。到目前为止,这真的只是开销,我们并没有学到用非 Reactive 工具无法以更明显的方式完成的很多东西。该系列的下一篇文章将更深入地探讨 Reactive 模型的阻塞、调度和异步方面,并向您展示如何获得真正的好处的机会。