使用 Spring 进行响应式事务

工程 | Mark Paluch | 2019 年 5 月 16 日 | ...

回溯到 2016 年,我们的响应式之旅始于 Spring Framework 5,同时伴随着一些响应式集成。在我们的整个旅程中,其他项目也加入了响应式浪潮。通过 R2DBC,我们现在还为 SQL 数据库提供了响应式集成。随着支持事务的集成的增加,我们不断被问及

Spring Framework 是否支持响应式 @Transaction?

在我们的旅程开始时,我们还没有响应式形式的事务性集成,所以这个问题很容易回答:不需要响应式事务管理。

随着时间的推移,MongoDB 开始支持 MongoDB Server 4.0 中的多文档事务。R2DBC(响应式 SQL 数据库驱动程序的规范)开始兴起,我们决定通过 Spring Data R2DBC 来支持 R2DBC。这两个项目都希望公开事务行为,所以它们最终在其模板 API 上提供了 inTransaction(…) 方法来执行由原生事务保护的工作单元。

虽然使用 inTransaction(…) 方法处理较小的工作块很方便,但这并不反映 Spring 支持事务的方式。在使用命令式编程模型时,Spring Framework 允许两种事务管理方式:@TransactionalTransactionTemplate(分别是声明式和编程式事务管理)。

这两种事务管理方法都建立在 PlatformTransactionManager 之上,它管理事务性资源的事务。PlatformTransactionManager 可以是 Spring 提供的事务管理器实现,也可以是基于 JTA 的 Java EE 实现。

这两种方法都有一个共同点,即将事务状态绑定到 ThreadLocal 存储,这允许在不传递 TransactionStatus 对象的情况下管理事务状态。事务管理应该在后台以非侵入的方式进行。ThreadLocal 在命令式编程安排中之所以有效,是因为底层假设我们不会让线程在事务内部继续工作。

命令式事务管理如何工作

事务管理需要将其事务状态与执行关联起来。在命令式编程中,这通常是 ThreadLocal 存储——事务状态绑定到 Thread。底层假设是事务代码在容器调用它的同一个线程上执行。

响应式编程模型消除了命令式(同步/阻塞)编程模型的这一基本假设。仔细观察响应式执行,我们可以发现代码在不同的线程上执行。在使用进程间通信时,这一点更加明显。我们不再能安全地假设我们的代码完全在同一个线程上执行。

这种假设的变化使得依赖于 ThreadLocal 的事务管理实现失效。

由于集成和优化(例如操作符融合),线程切换可能在任意时间发生。这种变化打破了所有依赖于 ThreadLocal 的代码。结果是我们需要一种不同的安排来反映事务状态,而无需始终传递 TransactionStatus 对象。

在响应式领域,关联带外数据并不是一个新要求。我们在其他领域也面临过这个要求,例如在 Spring Security 中使用 SecurityContext 实现响应式方法安全(仅举一例)。Project Reactor 是 Spring 构建其响应式支持的基础响应式库,自 3.1 版本以来就提供了对订阅者上下文的支持。

Reactor Context 对于响应式编程的作用,就像 ThreadLocal 对于命令式编程的作用一样:Contexts 允许将上下文数据绑定到特定的执行。对于响应式编程,这指的是 Subscription。Reactor 的 Context 让 Spring 可以将事务状态以及所有资源和同步绑定到特定的 Subscription。所有使用 Project Reactor 的响应式代码现在都可以参与响应式事务。返回标量值并希望访问事务细节的代码必须重写以使用响应式类型才能参与事务。否则,Context 不可用。

响应式事务管理

从 Spring Framework 5.2 M2 开始,Spring 通过 ReactiveTransactionManager SPI 支持响应式事务管理。

ReactiveTransactionManager 是一种用于使用事务性资源的响应式和非阻塞集成的事务管理抽象。它是返回 Publisher 类型的响应式 @Transactional 方法以及使用 TransactionalOperator 进行编程式事务管理的基础。

首批两个响应式事务管理器实现是

  • 通过 Spring Data R2DBC 1.0 M2 实现的 R2DBC
  • 通过 Spring Data MongoDB 2.2 M4 实现的 MongoDB

让我们看看响应式事务是什么样的

class TransactionalService {

  final DatabaseClient db

  TransactionalService(DatabaseClient db) {
    this.db = db;
  }

  @Transactional
  Mono<Void> insertRows() {

    return db.execute()
      .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
      .fetch().rowsUpdated()
      .then(db.execute().sql("INSERT INTO contacts (name) VALUES('Joe')")
      .then();
  }
}

在注解驱动的安排中,响应式事务看起来与命令式事务非常相似。但主要区别在于我们使用的是 DatabaseClient,这是一个响应式资源抽象。所有事务管理都在幕后发生,利用了 Spring 的事务拦截器和 ReactiveTransactionManager

Spring (根据方法返回类型) 区分应应用哪种事务管理类型

  • 方法返回 Publisher 类型:响应式事务管理
  • 所有其他返回类型:命令式事务管理

这种区分很重要,因为您仍然可以使用命令式组件,例如 JPA 或 JDBC 查询。将这些结果包装到 Publisher 类型中会向 Spring 发出信号,指示它应用响应式而非命令式事务管理。话虽如此,响应式事务安排并不会开启一个 ThreadLocal 绑定的事务,而这对于 JPA 或 JDBC 是必需的。

TransactionalOperator

接下来,让我们看看如何使用 TransactionalOperator 进行编程式事务管理

ConnectionFactory factory = …
ReactiveTransactionManager tm = new R2dbcTransactionManager(factory);
DatabaseClient db = DatabaseClient.create(factory);

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomicOperation = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('joe', 'Joe')")
  .fetch().rowsUpdated()
  .then(db.execute()
    .sql("INSERT INTO contacts (name) VALUES('Joe')")
    .then())
  .as(rxtx::transactional);

上面的代码包含了一些值得注意的组件

  • R2dbcTransactionManager: 这是用于 R2DBC ConnectionFactory 的响应式事务管理器。
  • DatabaseClient: 该客户端使用 R2DBC 驱动程序提供对 SQL 数据库的访问。
  • TransactionalOperator: 此操作符将所有上游 R2DBC 发布者与事务上下文关联起来。您可以使用操作符风格 as(…::transactional) 或回调风格 execute(txStatus -> …) 来使用它。

响应式事务在订阅时惰性启动。该操作符启动一个事务,设置适当的隔离级别,并将数据库连接与其订阅者上下文关联。所有参与(上游)的 Publisher 实例都使用一个绑定到 Context 的事务连接。

响应式函数式操作符链可以是线性的(通过使用单个 Publisher)或非线性的(通过合并多个流)。使用操作符风格时,响应式事务会影响所有上游的 Publisher。要将事务范围限制在特定的 Publisher 集合,请如下使用回调风格

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> outsideTransaction = db.execute()
  .sql("INSERT INTO person (name, age) VALUES('Jack', 31)")
  .then();

Mono<Void> insideTransaction = rxtx.execute(txStatus -> {
  return db.execute()
    .sql("INSERT INTO person (name, age) VALUES('Joe', 34)")
    .fetch().rowsUpdated()
    .then(db.execute()
      .sql("INSERT INTO contacts (name) VALUES('Joe Black')")
      .then());
  }).then();

Mono<Void> completion = outsideTransaction.then(insideTransaction);

在上面的示例中,事务管理仅限于在 execute(…) 内部订阅的 Publisher 实例。换句话说,事务是作用域化的。execute(…) 内部的 Publisher 实例参与事务,而名为 outsideTransactionPublisher 则在事务外部执行其工作。

R2DBC 是 Spring 与响应式事务集成的其中之一。另一个集成是通过 Spring Data MongoDB 实现的 MongoDB,您可以使用它通过响应式编程参与多文档事务。

Spring Data MongoDB 附带 ReactiveMongoTransactionManager 作为 ReactiveTransactionManager 实现。它创建一个会话并管理事务,以便在受管事务中执行的代码参与多文档事务。

以下示例展示了使用 MongoDB 进行编程式事务管理

ReactiveTransactionManager tm 
  = new ReactiveMongoTransactionManager(databaseFactory);
ReactiveMongoTemplate template = …
template.setSessionSynchronization(ALWAYS);                                          

TransactionalOperator rxtx = TransactionalOperator.create(tm);

Mono<Void> atomic = template.update(Step.class)
  .apply(Update.set("state", …))
  .then(template.insert(EventLog.class).one(new EventLog(…))
  .as(rxtx::transactional)
  .then();

上面的代码设置了一个 ReactiveTransactionManager 并使用 TransactionalOperator 在单个事务中执行多个写操作。ReactiveMongoTemplate 被配置为参与响应式事务。

下一步

响应式事务管理随 Spring Framework 5.2 M2、Spring Data MongoDB 2.2 M4 和 Spring Data R2DBC 1.0 M2 里程碑版本一同发布。您可以获取这些版本,并开始在您的代码中集成响应式事务管理。我们期待社区的反馈,以便在六月初发布候选版本之前磨平所有尖角。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢先一步

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 通过简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部