Reactor 调试体验

工程 | Sergei Egorov | 2019 年 3 月 28 日 | ...

Project Reactor 团队中,我们认为您所依赖的库的调试体验与功能集或性能一样重要。

今天,我们很高兴地宣布 Reactor 系列中的两个新的实验项目!

BlockHound - 新手入门

最常见的菜鸟错误之一是在应该仅运行非阻塞代码的 Java 线程中阻塞(例如,Schedulers.parallel())。
这是最有害的问题之一,因为您可能会阻塞无关的处理,甚至可能造成死锁!

考虑以下代码

Flux.range(0, Runtime.getRuntime().availableProcessors() * 2)
        .subscribeOn(Schedulers.parallel())
        .map(i -> {
            CountDownLatch latch = new CountDownLatch(1);

            Mono.delay(Duration.ofMillis(i * 100))
                .subscribe(it -> latch.countDown());

            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return i;
        })
        .blockLast();

运行此代码需要多长时间?1 秒?10 秒?
如果我告诉你它永远不会退出并会造成死锁呢?原因如下

  1. 我们创建了 N * 2 个信号,其中 N 是我们的 JVM 可以使用的 CPU 数量。
  2. 我们使用 Schedulers.parallel 订阅 - 一个限制为 N 个线程的有界池。
  3. 对于每个信号,我们为并行调度程序安排另一个任务(Mono.delay 隐式地使用 Schedulers.parallel)。
  4. 我们的逻辑假设延迟会很快被处理,并且闩锁最终会解除阻塞。
  5. 但是,所有 N 个线程都将等待其闩锁,并且延迟任务将永远不会执行!

即使您没有阻塞所有线程,只阻塞了一些线程,您也阻止了其他无关任务的推进。最可能的结果是性能会下降。

当您 将旧的阻塞代码迁移到响应式方法时,这个问题尤其明显。即使是最有经验的代码审查员也可能无法发现阻塞调用,因为您的 函数颜色相同

因此,我们创建了 BlockHound - 一个 Java 代理,用于检测来自非阻塞线程的阻塞调用。与其他解决方案不同,它对原始方法(甚至本机方法!)进行检测,并且无法通过反射调用阻塞方法!

现在,如果我们按照 文档中所述将其添加到我们的应用程序中,我们将得到以下异常

java.lang.Error: Blocking call! sun.misc.Unsafe#park
  at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
  at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
  at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
  at sun.misc.Unsafe.park(Unsafe.java)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
  at com.example.demo.BlockingCode.lambda$main$1(BlockingCode.java:24)

请注意,await 在内部调用 Unsafe#park 用于等待逻辑。我们不希望我们的线程被停放或阻塞,BlockHound 保护我们免受这种情况!

如果您想了解实现细节,请阅读 工作原理页面。
TL;DR:它包装原始方法并向其中添加仅两个方法调用。

您可以在测试或 QA/登台环境中运行它,而不会损失性能。天哪,鉴于开销很低,您甚至可以尝试在生产环境中运行它!:)

BlockHound 与 Project Reactor 和 RxJava 2 兼容,您可以 编写自己的集成

Reactor Debug 代理 - 生产就绪的程序集回溯

由于其函数式编程方面,调试响应式代码有时可能具有挑战性:您不是精确地指挥要对数据执行什么操作,而是声明数据应如何流经系统。这意味着声明和执行发生在不同的时间点。

您可以在 Simon 的精彩文章中了解更多信息:https://springjava.cn/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription

在 Reactor 中,我们将其称为“组装时间”和“执行时间”。在组装时间,您通过调用 myFlux.map(i -> i * 2).filter(5 % i == 1).single() 和其他运算符来“设计”您的管道。一段时间后,此“管道定义”将用于处理 myFlux 发布的信号。但是,如果发生错误会怎样?

你们中的一些人可能已经知道 Hooks.onOperatorDebug()。这是 reactor-core 中一个非常有用的钩子。它将堆栈跟踪从以下内容转换

java.lang.IndexOutOfBoundsException: Source emitted more than one item
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
  at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
  at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
  at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
  at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
  at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
  at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

到以下内容

java.lang.IndexOutOfBoundsException: Source emitted more than one item
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
   ...
  at java.lang.Thread.run(Thread.java:748)
  Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
  reactor.core.publisher.Flux.single(Flux.java:7380)
  com.example.demo.Example.run(Example.java:13)
Error has been observed by the following operator(s):
  |_	Flux.single ⇢ com.example.demo.Example.run(Example.java:13)
  |_	Mono.subscribeOn ⇢ com.example.demo.Example.run(Example.java:14)

对于以下代码

9:  public class Example {
10:
11:     public static void run() {
12:        Flux.range(0, 5)
13:            .single() // <-- Aha!
14:            .subscribeOn(Schedulers.parallel())
15:            .block();
16:     }
17: }

如您所见,在启用调试模式后,我们可以清楚地识别发生错误的组装操作。它就像一个堆栈跟踪,但(由于执行与组装分离)是一个回溯。

您可能会想:“太好了,现在我想要在生产环境中使用它!” - 我们也是。但是,当您使用 Hooks.onOperatorDebug() 时,我们必须在组装时进行重量级堆栈遍历以捕获每次调用运算符(如 .map(...))时的调用站点,即使您的代码永远不会抛出错误!这是由于 Java 中缺乏调用站点跟踪,其中唯一的替代方案是 new Exception().getStackTrace()StackWalker(在 Java 9 及更高版本中)。

显然,我们无法在生产环境中使用这种方法,因此我们为此制作了一个工具!

来自 reactor-tools 项目ReactorDebugAgent 是一个 Java 代理,可帮助您调试应用程序中的异常,而无需支付运行时成本(与 Hooks.onOperatorDebug() 不同)。

⚠️ 此项目处于孵化阶段,将来可能成为独立项目或 https://github.com/reactor/reactor-core 的模块,也可能不会。

它(通过字节码转换)转换以下链

Flux.range(0, 5)
       .single()

Flux flux = Flux.range(0, 5);
flux = Hooks.addCallSiteInfo(flux, "Flux.range\n foo.Bar.baz(Bar.java:21)"));
flux = flux.single();
flux = Hooks.addCallSiteInfo(flux, "Flux.single\n foo.Bar.baz(Bar.java:22)"));

要启用它,您需要首先初始化代理

ReactorDebugAgent.init();

ℹ️ 由于实现将在加载类时检测您的类,因此放置它的最佳位置是在 main(String[]) 方法中的其他所有内容之前

public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

结论

我们希望这些工具能够使您作为开发人员的生活更轻松,并让您对 Project Reactor 感到更舒适!

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部