超越
VMware 提供培训和认证,助力您的飞速发展。
了解更多在本文中,我们将继续关于响应式编程的系列文章。本篇的重点不再是学习基本 API,而是更具体的用例和编写实际有用的代码。我们将看到响应式是如何成为并发编程的有用抽象的,同时也会了解到它有一些非常底层的特性,我们需要谨慎对待。如果我们开始充分利用这些特性,就可以控制应用程序中以前由容器、平台和框架隐藏的层。
响应式迫使您以不同的方式看待世界。不再是请求某物并获得它(或未获得它),而是所有东西都作为序列(Publisher
)交付,您必须订阅它。不再等待答案,而是需要注册一个回调。一旦习惯了,这并不难,但除非整个世界都发生了翻天覆地的变化并变得响应式,否则您会发现需要与旧式的阻塞式 API 交互。
假设我们有一个返回 HttpStatus
的阻塞方法
private RestTemplate restTemplate = new RestTemplate();
private HttpStatus block(int value) {
return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
.getStatusCode();
}
并且我们想用不同的参数重复调用它并聚合结果。这是一个经典的“散列-聚集”用例,例如,如果您有一个分页的后端需要汇总跨多个页面的“前 N”项,就会遇到这种情况。由于非响应式(阻塞式)操作的细节与散列-聚集模式无关,我们可以将它们下推到名为 block()
的方法中,并在稍后实现。这是一个(糟糕的)示例,它调用后端并将结果聚合成一个 Result
类型的对象
Flux.range(1, 10) // (1)
.log()
.map(this::block) // (2)
.collect(Result::new, Result::add) // (3)
.doOnSuccess(Result::stop) // (4)
调用 10 次
此处是阻塞式代码
收集结果并聚合成一个对象
最后停止计时(结果是 Mono<Result>
)
请勿在家中尝试。这是一个“糟糕的”示例,因为虽然 API 在技术上使用正确,但我们知道它会阻塞调用线程;这段代码或多或少等同于一个 for 循环,其中包含 block()
方法的调用。更好的实现是将 block()
调用推送到后台线程。我们可以通过将其包装在返回 Mono<HttpStatus>
的方法中来实现
private Mono<HttpStatus> fetch(int value) {
return Mono.fromCallable(() -> block(value)) // (1)
.subscribeOn(this.scheduler); // (2)
}
此处是 Callable 内部的阻塞式代码,用于延迟执行
在后台线程上订阅慢速的 Publisher
scheduler
被单独声明为一个共享字段:Scheduler scheduler = Schedulers.parallel()
。然后我们可以声明我们想要使用 flatMap()
代替 map()
来处理序列
Flux.range(1, 10)
.log()
.flatMap( // (1)
this::fetch, 4) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop)
进入新的 Publisher 以并行处理
flatMap 中的并发提示
如果想在非响应式环境中运行上面的散列-聚集代码,可以使用 Spring MVC,如下所示
@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
return Flux.range(1, 10)
...
.doOnSuccess(Result::stop)
.toFuture();
}
如果你阅读 @RequestMapping
的 Javadoc 文档,你会发现一个方法可以返回一个 CompletableFuture
,“应用程序使用它在由其自行选择的单独线程中生成返回值”。在这种情况下,这个单独的线程由“scheduler”提供,它是一个线程池,因此处理正在多个线程上进行,每次4个,因为 flatMap()
被调用的方式。
使用后台线程的散列-聚集是一种有用的模式,但它并非完美无缺——它没有阻塞调用者,但它阻塞了某些东西,所以只是将问题转移了。这有一些实际意义。我们有一个 HTTP 服务器,它有(可能)非阻塞的 IO 处理器,将工作传递回一个线程池,每个 HTTP 请求一个线程——所有这些都发生在 Servlet 容器(例如 Tomcat)内部。请求是异步处理的,因此 Tomcat 中的工作线程没有被阻塞,而我们在“scheduler”中创建的线程池最多可以在 4 个并发线程上进行处理。我们正在处理 10 个后端请求(对 block()
的调用),因此使用 scheduler 理论上可以带来最多 4 倍的延迟降低。换句话说,如果在一个线程中按顺序处理所有 10 个请求需要 1000 毫秒,那么对于我们 HTTP 服务接收到的单个传入请求,处理时间可能会降至 250 毫秒。但是我们应该强调“可能”:只有在处理线程(在两个阶段,Tomcat 工作线程和应用 scheduler)没有争用的情况下,它才会这么快。如果您的服务器有大量核心,并发性非常低,即连接到您应用程序的客户端数量很少,并且几乎没有两个客户端同时发出请求的可能性,那么您可能会看到接近理论上的改进。一旦有多个客户端试图连接,它们都会竞争相同的 4 个线程,延迟会上升,甚至可能比单个客户端没有后台处理时更差。我们可以通过创建一个更大的线程池来改善并发客户端的延迟,例如
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
(16 个线程。)现在我们为线程及其堆栈使用了更多内存,并且在低并发情况下可以预期延迟会降低,但在高并发情况下则不一定,如果我们的硬件核心数少于 16 个。我们也不期望在高负载下获得更高的吞吐量:如果线程存在竞争,管理这些资源的成本很高,这必须体现在某个重要的指标中。如果您对这种权衡的更详细分析感兴趣,可以参考 Rob Harrop 的系列博客中关于负载下性能指标如何扩展的详细分析。
提示
Tomcat 默认分配 100 个线程来处理 HTTP 请求。如果我们知道所有处理都将在我们的 scheduler 线程池上进行,那么这数量就过多了。存在阻抗不匹配:scheduler 线程池可能成为瓶颈,因为它比上游的 Tomcat 线程池线程少。这突出了性能调优非常困难,并且虽然您可以控制所有配置,但这是一种微妙的平衡。
如果我们使用一个根据需求调整容量的 scheduler,我们可以做得比固定线程池更好。Reactor 为此提供了便利,所以如果您使用 Schedulers.elastic()
尝试相同的代码(您可以在任何地方调用它;只有一个实例),您会发现在负载下会创建更多线程。
从阻塞式到响应式的桥接是一种有用的模式,并且使用 Spring MVC 中的现有技术很容易实现(如上所示)。响应式旅程的下一阶段是完全摆脱应用程序线程中的阻塞,这需要新的 API 和新工具。最终,我们必须从上到下都是响应式的,包括服务器和客户端。这就是 Spring Reactive 的目标,它是一个新的框架,与 Spring MVC 正交,但满足同样的需求,并使用类似的编程模型。
注意
Spring Reactive 最初是一个独立项目,但在 Spring Framework 5.0 版本中被整合进去(第一个里程碑于 2016 年 6 月发布)。
在我们的散列-聚集示例中,完全实现响应式的第一步是在 classpath 中用 spring-boot-starter-webflux
替换 spring-boot-starter-web
。对于 Maven:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
或者对于 Gradle:
dependencies {
compile('org.springframework.boot:spring-boot-starter-webflux')
...
}
然后在控制器中,我们可以简单地去掉与 CompletableFuture
的桥接,直接返回一个 Mono
类型的对象
@RequestMapping("/parallel")
public Mono<Result> parallel() {
return Flux.range(1, 10)
.log()
.flatMap(this::fetch, 4)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
将这段代码放入 Spring Boot 应用程序中,它将在 Tomcat、Jetty 或 Netty 中运行,具体取决于 classpath 中找到了什么。Tomcat 是该 starter 中的默认服务器,因此如果您想切换,必须排除它并提供不同的服务器。就启动时间、内存使用和运行时资源使用而言,这三个服务器特性非常相似。
我们仍然在 block()
中有阻塞的后端调用,所以我们仍然需要在线程池上订阅,以避免阻塞调用者。如果我们有一个非阻塞客户端,就可以改变这一点,例如,代替使用 RestTemplate
,我们使用新的 WebClient
,然后可以使用以下方式来使用非阻塞客户端:
private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
private Mono<HttpStatus> fetch(int value) {
return this.client.perform(HttpRequestBuilders.get("http://example.com"))
.extract(WebResponseExtractors.response(String.class))
.map(response -> response.getStatusCode());
}
请注意,WebClient.perform()
(或者更确切地说,WebResponseExtractor
)有一个响应式返回类型,我们已将其转换为 Mono<HttpStatus>
,但我们尚未订阅它。我们希望框架完成所有订阅,所以现在我们是响应式贯穿始终的。
警告
Spring Reactive 中返回 Publisher
的方法**是**非阻塞的,但一般来说,返回 Publisher
(或 Flux
、Mono
或 Observable
)的方法仅仅是一个提示,表明它**可能**是非阻塞的。如果您正在编写此类方法,务必分析(最好进行测试)它们是否阻塞,并明确告知调用者它们可能阻塞。
注意
我们刚才使用的非阻塞客户端来简化 HTTP 栈的技巧,在常规 Spring MVC 中也适用。上面 fetch()
方法的结果可以转换为 CompletableFuture
并从常规的 @RequestMapping
方法中返回(例如在 Spring Boot 1.3 中)。
现在我们可以移除 HTTP 请求处理器中调用 fetch()
后的并发提示了
@RequestMapping("/netty")
public Mono<Result> netty() {
return Flux.range(1, 10) // (1)
.log() //
.flatMap(this::fetch) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
调用 10 次
进入新的 Publisher 以并行处理
考虑到我们完全不需要额外的 callable 和订阅线程,这段代码比我们必须桥接到阻塞客户端时干净得多,这归功于代码完全响应式。响应式 WebClient
返回一个 Mono
,这立即促使我们在转换链中选择 flatMap()
,所需的代码也就自然生成了。编写起来体验更好,可读性也更高,因此更容易维护。此外,由于没有线程池和并发提示,我们的性能预期中不再有那个神奇的 4 倍因子需要考虑。某个地方仍然存在限制,但它不再是我们应用层面的选择所施加的,也不再受服务器“容器”中任何东西的限制。这并非魔法,物理定律依然存在,所以所有后端调用仍将花费大约 100 毫秒,但在低争用情况下,我们甚至可能看到所有 10 个请求在大致与单个请求相同的时间内完成。随着服务器负载的增加,延迟和吞吐量自然会劣化,但这种劣化受限于缓冲区竞争和内核网络,而不是应用线程管理。这是一种控制反转,控制下移到了应用代码之下的栈层。
请记住,相同的应用程序代码可以在 Tomcat、Jetty 或 Netty 上运行。目前,Tomcat 和 Jetty 的支持是建立在 Servlet 3.1 异步处理之上的,因此不再局限于每个请求一个线程。它是建立在一个响应式桥接器之上,该桥接器将 Servlet 3.1 的概念适配到响应式范式。对于 Reactor Netty,背压和响应式支持是内置的。根据您选择的 HTTP 客户端库,服务器和客户端可能会共享相同的 HTTP 资源并进一步优化。我们将在本系列的另一篇文章中再讨论这一点。
提示
在示例代码中,“reactive”示例有 Maven 配置文件 "tomcat"、"tomcatNext"(用于 Tomcat 8.5)、"jetty" 和 "netty",因此您无需更改一行代码即可轻松尝试所有不同的服务器选项。
注意
许多应用程序中的阻塞代码并非 HTTP 后端调用,而是数据库交互。目前支持非阻塞客户端的数据库非常少(MongoDB 和 Couchbase 是明显的例外,但即使是它们也不如 HTTP 客户端成熟)。在所有数据库厂商都在客户端侧跟上之前,线程池和阻塞到响应式模式将长期存在。
我们已经将基本的散列-聚集用例简化到代码非常干净,并且对它运行的硬件非常友好。我们编写了一些简单的代码,并且使用 Spring 非常好地堆叠和编排成了一个工作的 HTTP 服务。在晴天,每个人都对结果非常满意。但一旦出现错误,例如行为不端的网络连接,或者后端服务延迟很高,我们就会遭殃。
首先,最明显的麻烦是,我们编写的代码是声明式的,因此很难调试。发生错误时,诊断信息可能非常不透明。使用原始的低级 API,例如没有 Spring 的 Reactor,甚至没有 Reactor 的 Netty,情况可能会更糟,因为那时我们必须自己构建大量的错误处理,每次与网络交互时都重复样板代码。至少有了 Spring 和 Reactor 的组合,我们可以期望看到未捕获的异常被记录堆栈跟踪。尽管如此,它们可能不容易理解,因为它们发生在不受我们控制的线程上,并且有时表现为相当低级的问题,来自栈中不熟悉的部分。
另一个痛点是,如果我们在某个响应式回调中犯了错误并阻塞了,我们将导致同一线程上的**所有**请求都被阻塞。对于基于 Servlet 的容器,每个请求都被隔离到一个线程中,阻塞不会阻塞其他请求,因为它们在不同的线程上处理。阻塞所有请求仍然是麻烦的根源,但在这种情况下,它只会表现为每个请求延迟增加一个大致恒定的因子。在响应式世界中,阻塞单个请求可能导致所有请求延迟增加,而阻塞所有请求可能导致服务器瘫痪,因为没有额外的缓冲层和线程来弥补不足。
能够控制异步处理中的所有活动组件是件好事:每一层都有线程池大小和队列。我们可以使其中一些层具有弹性,并尝试根据它们的工作量进行调整。但在某个时候,这会成为一种负担,我们开始寻找更简单或更精简的东西。对可伸缩性的分析得出结论,通常最好丢弃额外的线程,并根据物理硬件施加的限制工作。这是“机械同情”(mechanical sympathy)的一个例子,LMAX 在Disruptor 模式中成功地利用了这一点,并取得了巨大成效。
我们已经开始看到响应式方法的强大之处,但请记住,能力越大,责任越大。它是激进的,它是基础性的。它属于“推倒重来”的领域。所以您也会希望理解响应式并非解决所有问题的方案。事实上,它并非解决任何特定问题的方案,它只是 تسهیل (facilitates) 解决某一类问题。使用它所带来的好处可能被学习它、修改您的 API 使其完全响应式以及后续维护代码的成本所抵消,因此请小心行事。