领先一步
VMware 提供培训和认证,助您加速前进。
了解更多命令式代码以接收请求的速度消耗线程,而 软件正在吞噬世界。本文讨论了 JVM 上响应式编程的假设以及这对集成——特别是关系型数据库——意味着什么。
撰写本文的动机在于 响应式编程采用率持续增长,而一些主要构建块尚未就绪——特别是这个问题:关系型数据库怎么办?
关于什么是响应式编程以及它与响应式系统如何比较,有很多答案。我认为响应式编程是一种编程模型,通过创建事件驱动的非阻塞函数式管道来促进可伸缩性和稳定性,这些管道对资源的可用性和可处理性作出反应。延迟执行、并发和异步性只是底层编程模型的必然结果。
只有当整个技术栈都是响应式的,并且所有参与的组件(应用程序代码、运行时容器、集成)都遵循延迟执行、非阻塞 API 和数据流的流式特性——本质上遵循了底层假设——响应式编程的全部优势才能显现出来。
虽然可以将非响应式组件引入以函数式响应式风格编写的应用程序中,但最终结果是可伸缩性和稳定性效果(实际预期的收益)会降低。在最坏的情况下,运行时行为几乎没有或根本没有区别。然而,响应式编程有助于提高代码的可读性。
如果我们回顾响应式生态系统,我们会发现几个框架、库和集成。它们各自有其特定的优势。许多功能领域都得到了很好的覆盖,无论是通过通用方法还是在特定响应式框架的背景下。因此,让我们讨论关系型数据库集成。
关系型数据库很受欢迎早已不是秘密,而且,大多数企业项目可能都严重依赖关系型数据库的使用。无论如何,最常被问到的问题是:什么时候会有用于响应式关系型数据库集成的 API?
Java 使用 JDBC 作为与关系型数据库集成的主要技术。JDBC 具有阻塞性——没有什么合理的方法可以缓解 JDBC 的阻塞性质。使调用变为非阻塞的第一个想法是将 JDBC 调用卸载到一个 Executor
(通常是 Thread
池)。虽然这种方法在某种程度上可行,但它带来了一些缺点,这些缺点忽略了响应式编程模型的优点。
线程池需要——毫不意外——线程来运行。响应式运行时通常使用与 CPU 核心数匹配的有限数量的线程。额外的线程会引入开销并降低线程限制的效果。此外,JDBC 调用通常会在队列中堆积,一旦线程被请求饱和,线程池将再次阻塞。因此,JDBC 目前不是一个选择。
有一些独立的驱动程序,例如 Reactiverse 的 reactive-pg-client。这些驱动程序带有供应商特定的 API,并不真正适合更广泛的采用。客户端集成者需要提供额外的层来公开通用 API。新的驱动程序无法轻松插入客户端库。相比之下,拥有标准 API 将允许即插即用,同时将客户端与特定于数据库的解决方案解耦——这对所有人来说都是巨大的价值。
Oracle 发布了 ADBA,这是一项旨在通过使用 futures 在 Java 中提供异步数据库访问标准化 API 的倡议。ADBA 中的一切都在进行中,ADBA 团队乐于接收反馈。一群 Postgres 开发者正在开发一个 Postgres ADBA 驱动程序,可用于初步实验。PgNio 是另一个用于 Postgres 的异步驱动程序,它开始试验 ADBA。
ADBA 的可用性未知。它肯定不会随 Java 12 一起发布。坦率地说,ADBA 计划首次亮相的 Java 版本目前尚不清楚。
以下代码片段展示了使用 INSERT
和 SELECT
语句的 ADBA 用法
DataSource ds = dataSource();
CompletableFuture<Long> t;
try (Session session = ds.getSession()) {
Submission<Long> submit = session
.<Long>rowCountOperation(
"INSERT INTO legoset (id, name, manual) " +
"VALUES($1, $2, $3)")
.set("$1", 42055, AdbaType.INTEGER)
.set("$2", "Description", AdbaType.VARCHAR)
.set("$3", null, AdbaType.INTEGER)
.apply(Result.RowCount::getCount)
.submit();
t = submit.getCompletionStage().toCompletableFuture();
}
t.join();
CompletableFuture<List<Map<String, Object>>> t;
try (Session session = ds.getSession()) {
Submission<List<Map<String, Object>>> submit = session
.<List<Map<String, Object>>> rowOperation(
"SELECT id, name, manual FROM legoset")
.collect(collectToMap()) // custom collector
.submit();
t = submit.getCompletionStage().toCompletableFuture();
}
t.join();
请注意,collectToMap(…)
是一个由应用程序提供的函数的示例,该函数将结果提取到所需的返回类型中。
TL;DR(太长不看):目前没有可用的响应式 API 来访问关系型数据库。
由于缺乏标准 API 和可用驱动程序,Pivotal 的一个团队开始研究响应式关系型 API 的想法,该 API 将非常适合响应式编程目的。他们提出了 R2DBC,它代表响应式关系型数据库连接(Reactive Relational Database Connectivity)。目前,R2DBC 是一个孵化器项目,用于评估可行性并开始讨论驱动程序供应商是否对支持响应式/非阻塞/异步驱动程序感兴趣。
目前,有三个驱动程序实现:
R2DBC 带有一个 API 规范(r2dbc-spi
)和一个客户端(r2dbc-client
),使得 SPI 可供应用程序使用。
以下代码片段展示了使用 INSERT
和 SELECT
语句的 R2DBC SPI 用法
ConnectionFactory connectionFactory = null;
Mono<Integer> count = Mono.from(connectionFactory.create())
.flatMapMany(it ->
it.createStatement(
"INSERT INTO legoset (id, name, manual) " +
"VALUES($1, $2, $3)")
.bind("$1", 42055)
.bind("$2", "Description")
.bindNull("$3", Integer.class)
.execute())
.flatMap(io.r2dbc.spi.Result::getRowsUpdated)
.next();
Flux<Map<String, Object>> rows = Mono.from(connectionFactory.create())
.flatMapMany(it -> it.createStatement(
"SELECT id, name, manual FROM legoset").execute())
.flatMap(it -> it.map((row, rowMetadata) -> collectToMap(row, rowMetadata)));
虽然上面的代码有些冗长,但 R2DBC 也提供了一个客户端库项目,用于更人性化的用户 API。R2DBC SPI 不旨在直接使用,而是通过客户端库来使用。
使用 R2DBC Client 重写的相同代码如下:
R2dbc r2dbc = new R2dbc(connectionFactory);
Flux<Integer> count = r2dbc.inTransaction(handle ->
handle.createQuery(
"INSERT INTO legoset (id, name, manual) " +
"VALUES($1, $2, $3)")
.bind("$1", 42055)
.bind("$2", "Description")
.bindNull("$3", Integer.class)
.mapResult(io.r2dbc.spi.Result::getRowsUpdated));
Flux<Map<String, Object>> rows = r2dbc
.inTransaction(handle -> handle.select(
"SELECT id, name, manual FROM legoset")
.mapRow((row, rowMetadata) -> collectToMap(row, rowMetadata));
请注意,collectToMap(…)
是一个由应用程序提供的函数的示例,该函数将结果提取到所需的返回类型中。
Spring Data 团队启动了 Spring Data R2DBC 作为孵化器项目,旨在通过数据库客户端提供响应式 API 并支持响应式仓库。使用 Spring Data R2DBC 重写的示例代码如下:
DatabaseClient databaseClient = DatabaseClient.create(connectionFactory);
Mono<Integer> count = databaseClient.execute()
.sql(
"INSERT INTO legoset (id, name, manual) " +
"VALUES($1, $2, $3)")
.bind("$1", 42055)
.bind("$2", "Description")
.bindNull("$3", Integer.class)
.fetch()
.rowsUpdated();
Flux<Map<String, Object>> rows = databaseClient.execute()
.sql("SELECT id, name, manual FROM legoset")
.fetch()
.all();
R2DBC 及其生态系统尚处于早期阶段,需要实验和反馈来收集用例,并判断响应式关系型数据库集成是否有意义。
让我们谈谈技术的结合。虽然 JDBC 和其他技术暴露了阻塞 API(主要是由于等待 I/O),但 Project Loom 正在开发中。Loom 引入了 Fibers
作为一种轻量级抽象,它将把阻塞 API 变成非阻塞的。这可以通过在调用遇到阻塞 API 时进行堆栈切换来实现。因此,底层的 Fiber
试图在使用了阻塞 API 的先前流程上继续执行。
Fiber
执行模型显著减少了所需的原生线程数。其结果是更好的可伸缩性和非阻塞行为——通过将阻塞调用卸载到由 Fiber
支持的 Executor
。我们这里只需要一个适当的 API,它允许使用基于 Fiber
的非阻塞 JDBC 实现。
响应式编程和关系型数据库的未来如何?老实说,我不知道。如果我尝试一个有根据的猜测,我认为 Project Loom 和基于 Fiber 的 Executor 与成熟的 JDBC 驱动程序相结合,可能是行业中的一个潜在的颠覆者。随着 Java 加速的发布节奏,这可能不会太遥远。
ADBA 的目标是包含在 Java 标准运行时中,我预计最早也要到 Java 17,根据当前的时间表,那将在 2021 年的某个时候。
与此形成对比的是 R2DBC,它现在已经可用。它带有驱动程序和客户端,并允许实验性使用。R2DBC 的一个显著优点是它暴露了一个完全响应式的 API,同时独立于底层的数据库引擎。随着版本已经发布,无需猜测 Project Loom,也无需等待可能长达三年的时间来试用 API。今天使用 R2DBC 就可以实现。