不能只有大数据,还得有快数据:Reactor 1.0 正式发布 (GA)

发布 | Jon Brisbin | 2013 年 11 月 12 日 | ...

我很高兴宣布 Reactor,一个用于在 JVM 上构建响应式、快数据应用的强大基础库已达到正式发布 (GA) 阶段!

Reactor 是什么,我为什么要关心它?

Reactor 提供了必要的抽象来构建高吞吐量、低延迟的应用——我们现在称之为“快数据”应用——这些应用绝对必须能够处理每秒数千、数万甚至数百万的并发请求。

您应该关心 Reactor,因为现代应用(拥有非人类消费者,例如手机及其上的应用)生成的数据量超过了传统的每连接一个线程服务器所能支持的能力,因此 Reactor 为您提供了构建这类高扩展性应用所需的工具和抽象,而不会让您陷入异步应用中管理状态和传递事件的繁琐事务。现代 JVM 应用必须构建在坚实的异步和反应式组件基础上,这些组件可以在极少数系统线程上高效地管理大量任务的执行。Reactor 专门设计用于帮助您构建这类应用,而不会阻碍您或强迫您遵循某种固定的模式。

Reactor 是基础

Reactor 本身深受同名著名设计模式的影响——但它并不仅仅受此模式启发。它也包含 Actor 模型和传统事件驱动回调编程的元素。

尽管它是 Spring IO 平台基础的一部分,但 核心 Reactor 库不依赖于 Spring。Reactor Core 是一个独立的库,其唯一的外部依赖是 SLF4J 和优秀的 LMAX Disruptor RingBuffer 库

在 Reactor Core 的基础上构建了其他可选组件,以便于按照常见模式开发应用。Reactor 的一些内置的一流支持包括:

  • 通过高速的 Processor 抽象支持 LMAX Disruptor,该抽象提供了 RingBuffer 之上的 Reactor API。
  • 通过灵活的 PersistentQueue 抽象支持高性能的 JavaChronicle 持久化消息传递库
  • 支持 Groovy 闭包和 @CompileStatic,并提供全面的环境构建和事件连接 DSL。
  • 基于 Netty 4.0 的高性能 TCP 客户端和服务器支持。
  • 强大的基于注解的 Spring 支持。
  • 启动时有大量内容...

Reactor 速度很快

Reactor 从头开始设计,旨在灵活高效地工作,以便它能够不妨碍您,并帮助您的应用尽可能快地处理数据。在最快的配置下,一个标准的基于 RingBuffer 的 Reactor 可以在一台标准的开发者笔记本上每秒发布超过 1000-1500 万个事件。高性能的 Processor 抽象可以将每秒超过 1 亿个事件传输到您的应用中。您的应用对数据进行的何种处理会降低 Reactor 的速度,这取决于具体的任务。但在最佳的无操作模式下,吞吐量如此之高,应用不会因为等待 Reactor 完成工作而停滞!

Reactor 是函数式的

Reactor core 包含一些基本抽象,这些抽象受到 JDK 8 新函数式抽象(如 Function<T,V>Consumer<T>Supplier<T>Predicate<T>)的启发(在某些情况下甚至直接基于它们)。Reactor 本身不仅构建在这些抽象的基础上,您的应用也可以利用它们。未来某个时候,当 JDK 8 的普及率足够高时,Reactor 就可以直接从库中删除这些抽象,并依赖于 JDK 8 中的对应部分。在此之前,您的 JDK 6 和 7 应用现在就可以受益于这些函数式抽象。

Reactor 是响应式的

.NET 的 Reactive ExtensionsNetflix 的 RxJava、JDK 8 Stream 抽象以及许多其他库(更不用说 20 年的事件驱动计算机科学)的启发,Reactor 提供了一种“响应式”编程模型,使得协调异步任务变得更加容易。像 Stream<T>Promise<T> 这样的抽象使得链接非阻塞操作变得简单而简洁——告别回调函数嵌套!

@Inject
AsyncDataLoader loader;

Promise<Buffer> p = loader.get("U-U-I-D")
    .map(new Function<Buffer, Data>() {
      public Data apply(Buffer buff) {
        // transform data
        Data data = parser.parse(buff);
        return data;
      }
    })
    .filter(new Predicate<Data>() {
      public boolean test(Data data) {
        // check Data for certain conditions being true
        return null != data.getName();
      }
    })
    .consume(new Consumer<Data>() {
      public void accept(Data data) {
        // only Data that passes the Predicate test makes it here...
      }
    });
    
// Promises can also block like a Future
Buffer buff = p.await();

这些操作(mapfilterconsume)中的每一个都是独立执行的(可能)异步操作。在传统的多线程环境中,需要添加大量围绕阻塞 Future 和等待完成的冗余代码。然而,使用 Reactor,您只需以响应式的方式将操作链接起来,这样当上一个操作完成后,下一个操作就会对数据“做出反应”。

Reactor 很 Groovy

Reactor 包含对 Groovy 语言的一流支持。它支持使用闭包作为回调函数,拥有一个强大的 DSL 用于配置 Reactor 环境,并提供了一些非常酷的运算符重载来编写简洁的代码。

Reactor 是可扩展的

Clojurewerkz 有一个名为 Meltdown 的库,它是基于 Reactor 构建的。可以毫不费力地添加对其他 JVM 语言的支持。Reactor 的 API 设计为可扩展的,以便非 Java 语言也能从 Reactor 的工具中受益。

代码长什么样?

Reactor 已经支持 Java 8,所以我们先来看一些使用 JDK 8 强大 Lambda 特性的 Reactor 代码。

import static reactor.event.selector.Selectors.*;

// Only create one of these per JVM
static Environment env = new Environment();

// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
  .<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));

r.notify("topic", Event.wrap("Hello World!"));

Reactor 希望实现的目标之一是减少您需要编写的代码量;上面的代码非常简洁。即使在 Java 6 和 7 中,它也非常简洁。

import static reactor.event.selector.Selectors.*;

// Only create one of these per JVM
static Environment env = new Environment();

// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
  .on($("topic"), new Consumer<Event<String>>() {
    public void accept(Event<String> ev) {
      System.out.prinltn("greeting: " + ev.getData());
    }
  });

r.notify("topic", Event.wrap("Hello World!"));

在 Groovy 中则更加简洁(正如您所预期的),因为语言支持会处理一些对象的类型转换,并允许使用闭包。

def env = new Environment()

def r = Reactors.reactor(env).on("topic") { String greeting ->
  println "greeting: $greeting"
}

r.notify "topic", "Hello World!"

调度器 (Dispatchers)

一个 Dispatcher 负责在给定的 Thread 上执行任务。Dispatcher 有多种内置实现,可以在调用线程中、线程池中的线程上、使用单线程事件循环式调度,或者最快的调度器:使用 LMAX Disruptor RingBuffer 调度任务的 RingBufferDispatcher

每当您在 Reactor 中创建组件时,通常会指定用于调度事件的 Dispatcher。在高容量应用中,使用线程池可能会对 CPU 和 GC 产生极高的开销,而将事件调度到 RingBuffer 中则极其高效。使用 RingBufferDispatcher 每秒可以调度数千万个事件。

选择器 (Selectors)

一个 Selector 是动作与事件键的动态映射。当您向 Reactor 分配一个动作时,通过注册一个 Selector 来告诉它响应哪些事件键。有几种内置实现,可以匹配诸如 Object.equals() 之类的内容,进行基于字符串的正则表达式匹配,URI 模板匹配(您可以使用熟悉的括号定界占位符表示法来匹配 URI),Class.isAssignableFrom() 匹配(只选择继承自某个共同抽象的键),Predicate 匹配(允许您基于特定范围的谓词创建任意 Predicate<T> 选择器),甚至还有一个可选的 JsonPathSelector,它使用 JsonPath 通过 JsonPath 表达式从键中查询数据。

您可能已经注意到,在示例中使用了 Java 开发者可能有点困惑的东西:用于创建 Selector$ 快捷方法 [1]。如果您使用过 jQuery 进行 Web 开发,那么您会感到非常熟悉,因为 $ 方法只是创建 Selector 的一个快捷方式,就像 jQuery 在编写 $(".css-class") 时创建 CSS Query 一样。如果美元符号对您来说太不寻常,Reactor 总是尝试提供多种方法来完成同一件事;您可以使用 Selectors.object(T)ObjectSelector.objectSelector() 静态创建方法代替(或者直接使用构造函数创建 ObjectSelector 实例)。

[1]: 除了 $(T),还有其他创建 Selectors 的快捷辅助方法。例如,R(String) 用于创建 RegexSelectors,T(Class<?>) 用于创建 ClassSelectors,U(String) 用于创建 UriTemplateSelectors。

Promise 和 Stream

Reactor 的 PromiseStream 提供了一种响应式的、可组合的方式来协调多个异步任务,而不会产生过多的回调函数嵌套。Promise 是一个有状态组件,可以在应用中传递,代表一个将从另一个线程填充的值。像传统的 Future 一样,Promise 可以阻塞调用线程。但更重要的是,Promise 使得转换值和执行整个处理链变得容易。

一个 StreamPromise 类似,因为它提供了一个组合 API 来响应未来的值。但是 StreamPromise 的不同之处在于,它被设计用来处理通过的多个值。

要在 PromiseStream 中填充值,您需要创建一个 Deferred,它是一个 Consumer<T>。您可以将此 Deferred 传递到您的服务层,以便将最终值传回给调用者。

// Only create one of these per JVM
static Environment env = new Environment();

public class DataLoader {

  public Promise<Buffer> load(String key) {  
    Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);

    // submit work to be done in another thread
    // like reading data from a datastore
    datastore.load(key, deferred);
    
    return deferred.compose();
  }
  
}

// Your service layer uses this API
@Inject
DataLoader loader;

loader.load("obj-key")
  .onSuccess(new Consumer<Buffer>() {
    public void accept(Buffer b) {
      // handle eventual data
    }
  })
  .onError(new Consumer<Throwable>() {
    public void accept(Throwable t) {
      // handle errors
    }
  });

元组 (Tuples)

Scala 的 Tuple 类是一种类型安全的方式,用于传递一个封装了其他值的单一对象,而无需创建特定于应用的、一次性的“holder”bean。Reactor 将此功能融入到其自己的 Tuple 类实现中。

元组使用起来非常简单。您可以使用 Tuple.from(T1, T2, …) 方法创建一个元组,然后可以使用 Tuple.getT1()Tuple.getTN() 方法获取其中的值。

reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
  public void accept(Event<Tuple2<URI, Buffer>> ev) {
    URI uri = tup.getT1();
    Buffer buff = tup.getT2();  
    
    // deal with request from uri.getPath()
  }
});

// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));

查看 Tuple API 文档以了解所有可能性。

TcpClient 和 TcpServer

Reactor 提供功能齐全的 TCP 客户端和服务器抽象。它们提供了一种简单的方式来构建可以支持大量客户端的基于 TCP 的应用。Reactor TCP 支持中的基本抽象是通用的,可以创建多种实现来利用不同的 TCP 技术。然而,内置的实现利用了优秀的 Netty 库来进行异步 IO。

Apache 许可,社区友好

Reactor 是开源的,采用 Apache 许可。开发者和用户社区是一群普通人,他们希望一起工作,为在 JVM 上构建响应式、快数据应用创建一个奇妙的基础。加入我们的社区,了解更多关于 Reactor 的信息,或者通过您希望看到的任何改进来贡献力量。

要快速开始使用 Reactor 并查看不同上下文中的一些代码,请查看快速入门指南:

https://github.com/reactor/reactor-quickstart

或示例:

https://github.com/reactor/reactor-samples

要 fork 源代码、阅读 wiki 或提交问题,请访问我们在 GitHub 上的页面:

https://github.com/reactor/reactor

您可以加入 Google Group 来提问或参与围绕 Reactor 的讨论:

https://groups.google.com/forum/#!forum/reactor-framework

获取 Maven Artifacts 以包含到您的项目中

<dependencies>

	<!-- core components -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>
	
	<!-- groovy support -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-groovy</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

	<!-- tcp client/server -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-tcp</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

	<!-- spring support -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-spring</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

</dependencies>

获取 Spring 电子报

通过 Spring 电子报保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速前进。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单订阅。

了解更多

即将举办的活动

查看 Spring 社区的所有即将举办的活动。

查看全部