领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多在 Project Reactor 团队中,我们认为您所依赖的库的调试体验与功能集或性能一样重要。
今天,我们很高兴地宣布 Reactor 系列中的两个新的实验项目!
最常见的菜鸟错误之一是在应该仅运行非阻塞代码的 Java 线程中阻塞(例如,Schedulers.parallel()
)。
这是最有害的问题之一,因为您可能会阻塞无关的处理,甚至可能造成死锁!
考虑以下代码
Flux.range(0, Runtime.getRuntime().availableProcessors() * 2)
.subscribeOn(Schedulers.parallel())
.map(i -> {
CountDownLatch latch = new CountDownLatch(1);
Mono.delay(Duration.ofMillis(i * 100))
.subscribe(it -> latch.countDown());
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i;
})
.blockLast();
运行此代码需要多长时间?1 秒?10 秒?
如果我告诉你它永远不会退出并会造成死锁呢?原因如下
N * 2
个信号,其中 N
是我们的 JVM 可以使用的 CPU 数量。Schedulers.parallel
订阅 - 一个限制为 N
个线程的有界池。Mono.delay
隐式地使用 Schedulers.parallel
)。即使您没有阻塞所有线程,只阻塞了一些线程,您也阻止了其他无关任务的推进。最可能的结果是性能会下降。
当您 将旧的阻塞代码迁移到响应式方法时,这个问题尤其明显。即使是最有经验的代码审查员也可能无法发现阻塞调用,因为您的 函数颜色相同!
因此,我们创建了 BlockHound - 一个 Java 代理,用于检测来自非阻塞线程的阻塞调用。与其他解决方案不同,它对原始方法(甚至本机方法!)进行检测,并且无法通过反射调用阻塞方法!
现在,如果我们按照 文档中所述将其添加到我们的应用程序中,我们将得到以下异常
java.lang.Error: Blocking call! sun.misc.Unsafe#park
at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
at sun.misc.Unsafe.park(Unsafe.java)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at com.example.demo.BlockingCode.lambda$main$1(BlockingCode.java:24)
请注意,await
在内部调用 Unsafe#park
用于等待逻辑。我们不希望我们的线程被停放或阻塞,BlockHound 保护我们免受这种情况!
如果您想了解实现细节,请阅读 工作原理页面。
TL;DR:它包装原始方法并向其中添加仅两个方法调用。
您可以在测试或 QA/登台环境中运行它,而不会损失性能。天哪,鉴于开销很低,您甚至可以尝试在生产环境中运行它!:)
BlockHound 与 Project Reactor 和 RxJava 2 兼容,您可以 编写自己的集成。
由于其函数式编程方面,调试响应式代码有时可能具有挑战性:您不是精确地指挥要对数据执行什么操作,而是声明数据应如何流经系统。这意味着声明和执行发生在不同的时间点。
您可以在 Simon 的精彩文章中了解更多信息:https://springjava.cn/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription
在 Reactor 中,我们将其称为“组装时间”和“执行时间”。在组装时间,您通过调用 myFlux.map(i -> i * 2).filter(5 % i == 1).single()
和其他运算符来“设计”您的管道。一段时间后,此“管道定义”将用于处理 myFlux
发布的信号。但是,如果发生错误会怎样?
你们中的一些人可能已经知道 Hooks.onOperatorDebug()
。这是 reactor-core
中一个非常有用的钩子。它将堆栈跟踪从以下内容转换
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
到以下内容
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
...
at java.lang.Thread.run(Thread.java:748)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:7380)
com.example.demo.Example.run(Example.java:13)
Error has been observed by the following operator(s):
|_ Flux.single ⇢ com.example.demo.Example.run(Example.java:13)
|_ Mono.subscribeOn ⇢ com.example.demo.Example.run(Example.java:14)
对于以下代码
9: public class Example {
10:
11: public static void run() {
12: Flux.range(0, 5)
13: .single() // <-- Aha!
14: .subscribeOn(Schedulers.parallel())
15: .block();
16: }
17: }
如您所见,在启用调试模式后,我们可以清楚地识别发生错误的组装操作。它就像一个堆栈跟踪,但(由于执行与组装分离)是一个回溯。
您可能会想:“太好了,现在我想要在生产环境中使用它!” - 我们也是。但是,当您使用 Hooks.onOperatorDebug()
时,我们必须在组装时进行重量级堆栈遍历以捕获每次调用运算符(如 .map(...)
)时的调用站点,即使您的代码永远不会抛出错误!这是由于 Java 中缺乏调用站点跟踪,其中唯一的替代方案是 new Exception().getStackTrace()
或 StackWalker
(在 Java 9 及更高版本中)。
显然,我们无法在生产环境中使用这种方法,因此我们为此制作了一个工具!
来自 reactor-tools 项目的 ReactorDebugAgent
是一个 Java 代理,可帮助您调试应用程序中的异常,而无需支付运行时成本(与 Hooks.onOperatorDebug()
不同)。
⚠️ 此项目处于孵化阶段,将来可能成为独立项目或 https://github.com/reactor/reactor-core 的模块,也可能不会。
它(通过字节码转换)转换以下链
Flux.range(0, 5)
.single()
到
Flux flux = Flux.range(0, 5);
flux = Hooks.addCallSiteInfo(flux, "Flux.range\n foo.Bar.baz(Bar.java:21)"));
flux = flux.single();
flux = Hooks.addCallSiteInfo(flux, "Flux.single\n foo.Bar.baz(Bar.java:22)"));
要启用它,您需要首先初始化代理
ReactorDebugAgent.init();
ℹ️ 由于实现将在加载类时检测您的类,因此放置它的最佳位置是在 main(String[])
方法中的其他所有内容之前
public static void main(String[] args) {
ReactorDebugAgent.init();
SpringApplication.run(Application.class, args);
}
我们希望这些工具能够使您作为开发人员的生活更轻松,并让您对 Project Reactor 感到更舒适!