使用 Spring、协程和 Kotlin Flow 走向响应式

工程 | Sébastien Deleuze | 2019年4月12日 | ...

自从我们在2017年1月宣布 Spring Framework 官方支持 Kotlin 以来,发生了很多事情。在 Google I/O 2017 大会上,Kotlin 被宣布成为官方 Android 开发语言,我们继续改进 Spring 生态系统中对 Kotlin 的支持,Kotlin 本身也持续发展,加入了 协程 等关键新特性。

我想借着 Spring Framework 5.2 的第一个里程碑版本 发布的机会,概述一下 Spring 和 Kotlin 的现状。我将尽力关注具体的改进,因为我相信 Spring 和 Kotlin 拥有相同的实用主义思维。

我认为这一切都与选择有关。我们(Spring 团队)提供了多种选择,但同时作为应用开发者,你在启动新的 Spring Boot 应用时也需要做出选择。例如:

  • 我应该使用哪种语言?

  • 注解式 @Controller 还是函数式风格?

  • Spring MVC 还是 WebFlux?

这些问题显然是高度主观的,通常取决于项目背景,但我将分享我的个人观点。

Java 还是 Kotlin?

Java 是显而易见的默认选择,但 Kotlin 是一个越来越受欢迎的替代方案。开发者从 Java 转向 Kotlin 的原因是什么?当人们问我时,我通常会说 Kotlin 让 Java 开发者能够利用现有技能编写更短、更安全、表达力更强的代码。但要做出明智的选择,我们需要确定更具体的要点。

我最喜欢的 Kotlin 特性是它将 null,这个所谓的(多次)“十亿美元的错误”,变成了一项安全特性。Java 的错误不在于 null 本身,而在于其类型系统中没有明确管理 null,导致了类似于动态语言中观察到的问题。Kotlin 在其类型系统中拥抱 null,并用它来处理值的缺失。在 Kotlin 中,像 String 这样的类型是不可空的,因此可以安全地使用,而像 String? 这样的类型是可空的,应该谨慎使用。好消息是 Kotlin 编译器会在编译时报告潜在的错误,你可以使用安全调用Elvis 运算符null 执行块来优雅地处理它们。与 Java 的 Optional 不同,Kotlin 的空安全也适用于输入参数,并且不会强制你使用影响代码性能和可读性的包装器。

DSL(领域特定语言)也是 Kotlin 的另一大亮点。Gradle Kotlin DSL(对 start.spring.io 的支持即将到来)就是一个很好的例子,它利用 Kotlin 的静态类型特性,提供了一个非常丰富灵活的 API,具有出色的可发现性和信心。Spring Framework 为 bean 定义函数式路由甚至 MockMvc 提供了 Kotlin DSL。

我还可以详细介绍很多其他的好理由,比如带默认值的可选参数、与 Java API(如 Spring)的出色互操作性扩展函数、为了避免类型擦除而使用的具体化类型参数数据类或默认鼓励的不可变性,但我认为你应该通过示例学习 Kotlin,最终借助参考文档,并自己做出判断。你也可以按照这个使用 Kotlin 的 Spring Boot 循序渐进教程进行学习。

所以,就说我会在下一个 Spring Boot 项目中选择 Kotlin 吧 ;-)

注解式 @Controller 还是函数式风格?

正如我在引言中所说,选择取决于上下文和个人品味。考虑到 Kotlin 优秀的 DSL 和函数式编程能力,我非常喜欢使用 Kotlin 的函数式路由。我甚至正在探索如何通过实验性的 Kofu DSL for Spring Boot 以函数式方式定义 Spring Boot 应用配置,该项目目前正在 Spring Fu 仓库中孵化。

但今天,假设我的团队由多年来习惯使用 @Controller 编程模型的开发者组成,而且我不想同时改变所有东西,所以让我们保留 @Controller

Spring MVC 还是 WebFlux?

我们在 Web 框架方面提出的选择如下。

你可以继续使用 Spring MVC 和所有我们持续改进的相关成熟技术:Tomcat、JPA 等。你甚至可以通过使用现代的 WebClient API 替代 RestTemplate 来利用一些响应式功能。

但我们也提供了一个响应式栈,其中包括 WebFlux,这是一个基于 Reactive Streams 的 Web 框架,适用于那些需要更高可伸缩性、对延迟不敏感(对微服务架构有用)以及更好的流处理能力的开发者。生态系统的其他部分,如 Spring Data 和 Spring Security 也提供了响应式支持。

在 Java 中使用 Reactor API 的 WebFlux

到目前为止,使用基于 WebFlux 的 Spring 响应式栈需要一个相当大的转变,即通过使用 Reactor 的 MonoFlux 或 RxJava 类似类型等 API,将 IO 相关功能(Web、持久化)从命令式风格切换到声明式/函数式风格。这种颠覆性的方法与命令式编程相比提供了实际优势,但它也非常不同,需要不低的学习曲线。

让我们通过具体的代码看看这意味着什么,并借此机会向你展示如何使用 R2DBC(基于 Reactive Streams 的 JDBC 替代方案)和 Spring Data R2DBC 以响应式方式访问 SQL 数据库。

如果选择 Java,我们会编写如下所示的 UserRepository 类,它使用 Spring Data R2DBC 提供的 DatabaseClient API 暴露一个响应式 API 来访问 SQL 数据库。

class UserRepository {

	private final DatabaseClient client;

	public UserRepository(DatabaseClient client) {
		this.client = client;
	}

	public Mono<Long> count() {
		return client.execute().sql("SELECT COUNT(*) FROM users")
			.as(Long.class).fetch().one();
	}

	public Flux<User> findAll() {
		return client.select().from("users").as(User.class).fetch().all();
	}

	public Mono<User> findOne(String id) {
		return client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).as(User.class).fetch().one();
	}

	public Mono<Void> deleteAll() {
		return client.execute().sql("DELETE FROM users").then();
	}

	public Mono<Void> save(User user) {
		return client.insert().into(User.class).table("users")
			.using(user).then();
	}

	public Mono<Void> init() {
		return client.execute().sql("CREATE TABLE ...").then()
			.then(deleteAll())
			.then(save(new User("smaldini", "Stéphane", "Maldini")))
			.then(save(new User("sdeleuze", "Sébastien", "Deleuze")))
			.then(save(new User("bclozel", "Brian", "Clozel")));
	}
}

注意

保存用户操作可以采用 fork-join 方式完成,因为这些操作彼此不依赖,但为了比较,我使用了通过 then() 串联的顺序操作。

你可以看到,在这种 API 中,void 变成了 Mono<Void>User 变成了 Mono<User>。这允许以非阻塞方式使用它们,并提供了丰富的操作符。但这同时也强制要求使用 Mono 包装器,并显著改变了这些 API 的使用方式。例如,如果某些操作需要按顺序执行,就像在 init() 方法中那样(命令式代码很简单),在这里我们必须使用 then 操作符构建一个声明式管道。

Flux<User> 提供了更多的附加价值,因为它允许将传入的用户作为流进行处理,而阻塞式栈中常用的 List<User> 的使用意味着在处理之前将所有数据加载到内存中。注意,这里我们也可以使用 Mono<List<User>>

在控制器端,你可以看到 Spring WebFlux 原生地支持这些响应式类型,你还可以看到基于 Reactive Streams 的 API 的另一个特点,即异常主要被用作由响应式类型携带的错误信号,而不是像常规命令式代码那样被抛出。

@RestController
public class UserController {

	private final UserRepository userRepository;

	public UserController(UserRepository userRepository) {
		this.userRepository = userRepository;
	}

	@GetMapping("/")
	public Flux<User> findAll() {
		return userRepository.findAll();
	}

	@GetMapping("/{id}")
	public Mono<User> findOne(@PathVariable String id) {
		return userRepository
			.findOne(id)
			.switchIfEmpty(Mono.error(
				new CustomException("This user does not exist");
	}

	@PostMapping("/")
	public Mono<Void> save(User user) {
		return userRepository.save(user);
	}
}

在 Kotlin 中使用协程 API 的 WebFlux

重要的是要理解,Spring 的响应式支持是构建在 Reactive Streams 之上的,并且考虑了互操作性,Reactor 被用于两个不同的目的

  • 它是我们在 Spring 响应式基础设施中处处使用的 Reactive Streams 实现

  • 它也是默认暴露的响应式公共 API

但 Spring 的响应式支持从一开始就被设计为易于适应其他异步或响应式 API,如 CompletableFuture、RxJava 2,以及现在支持的协程。在这种情况下,我们内部仍然利用 Reactor,但在公共 API 层面适应不同的终端用户响应式 API。

当然,如果你喜欢这种方法,在 Kotlin 中继续使用 FluxMono 是完全可以的,但 Spring Framework 5.2 引入了一个新的重要特性:我们现在可以使用 Kotlin 协程 以更命令式的方式利用 Spring 响应式栈。

协程是 Kotlin 的轻量级线程,允许以命令式方式编写非阻塞代码。在语言层面,由 suspend 关键字标识的挂起函数为异步操作提供了抽象,而在库层面,kotlinx.coroutines 提供了 async { } 等函数以及 Flow 等类型,Flow 是协程世界中的 Flux 等价物。

当类路径中包含 kotlinx-coroutines-corekotlinx-coroutines-reactor 依赖时,协程支持将被启用

build.gradle.kts

dependencies {
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

那么,用 Kotlin 代替 Java 编写,并使用协程和 Flow 代替 MonoFluxUserRepositoryUserController 会是什么样子呢?

class UserRepository(private val client: DatabaseClient) {

	suspend fun count(): Long =
		client.execute().sql("SELECT COUNT(*) FROM users")
			.asType<Long>().fetch().awaitOne()

	fun findAll(): Flow<User> =
		client.select().from("users").asType<User>().fetch().flow()

	suspend fun findOne(id: String): User? =
		client.execute()
			.sql("SELECT * FROM users WHERE login = :login")
			.bind("login", id).asType<User>()
			.fetch()
			.awaitOneOrNull()

	suspend fun deleteAll() =
		client.execute().sql("DELETE FROM users").await()

	suspend fun save(user: User) =
		client.insert().into<User>().table("users").using(user).await()

	suspend fun init() {
		client.execute().sql("CREATE TABLE IF NOT EXISTS users (login varchar PRIMARY KEY, firstname varchar, lastname varchar);").await()
		deleteAll()
		save(User("smaldini", "Stéphane", "Maldini"))
		save(User("sdeleuze", "Sébastien", "Deleuze"))
		save(User("bclozel", "Brian", "Clozel"))
	}
}

你可以在这里看到,我们不是返回例如 Mono<User>,而是在一个挂起函数中返回 User(或者更准确地说是它的可空变体 User?),这个挂起函数可以以命令式方式使用。init() 方法实现中的差异很好地说明了这一点,因为我们现在使用的是常规的命令式代码,而不是链式调用 then

但是等等,我怎么能直接在 DatabaseClient 类型上使用协程呢?它是一个基于 MonoFlux 的响应式 API。这是可能的,因为 Spring Data R2DBC 也提供了 Kotlin 扩展(例如请参阅此处),一旦导入,这些扩展允许你在 DatabaseClient 上添加基于协程的方法。按照约定,挂起方法以 await 为前缀或以 AndAwait 为后缀,并且名称与其基于 Mono 的对应方法相似。

现在让我们更深入地了解一下 Flow<User> 返回类型。首先,请注意我们指的是 kotlinx.coroutines.flow.Flow,而不是 Java 9+ 提供的 Reactive Streams 容器类型 java.util.concurrent.Flow

你会像使用 Java 8+ 的 Stream 或其 Kotlin 等价物 Sequence 一样使用 Flow API,但巨大的区别在于它适用于异步操作并管理背压。所以它是协程世界中的 Flux 等价物,适用于热流或冷流,有限流或无限流,主要区别如下:

  • Flow 是基于推送的,而 Flux 是推拉混合的

  • 背压通过挂起函数实现

  • Flow 只有一个挂起方法 collect,操作符实现为扩展函数

  • 借助协程,操作符易于实现

  • 扩展函数允许向 Flow 添加自定义操作符

  • 收集操作是挂起函数

  • map 操作符支持异步操作(无需 flatMap),因为它接受一个挂起函数参数

现在让我们看一下控制器的协程版本

@RestController
class UserController(private val userRepository: UserRepository) {

	@GetMapping("/")
	fun findAll(): Flow<User> =
		userRepository.findAll()

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): User? =
		userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")

	@PostMapping("/")
	suspend fun save(user: User) =
		userRepository.save(user)
}

同样,你可以看到代码与我们在 Spring MVC 中使用的常规命令式代码非常接近。

除了为基于 FluxMono 的 API(如 WebClientServerRequestServerResponse)提供协程扩展之外,Spring WebFlux 现在还原生支持注解式 @Controller 类中的挂起函数和 Flow 返回类型。

使用命令式代码进行异步操作

让我们利用 WebClient 协程扩展来看看如何串联异步调用。我们将请求一个远程 HTTP 端点来获取额外的 UserDetail1UserDetail2

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails {
		val userDetail1 = client.get().uri("/userdetail1/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail1>()
		val userDetail2 = client.get().uri("/userdetail2/${user.login}")
			.accept(APPLICATION_JSON)
			.awaitExchange().awaitBody<UserDetail2>()
		return UserWithDetails(user, userDetail1, userDetail2)
	}
}

这里我们使用 WebClient 协程扩展,如 awaitExchange()awaitBody(),以纯粹命令式的方式执行异步和非阻塞操作。而且由于 Flowmap 操作符接受一个挂起函数参数,我们可以在其中执行这样的操作,不像在 Java 中使用响应式 API 那样需要 flatMap

并行分解

如前所述,协程默认是顺序执行的,但它们也可以用于并行执行操作。让我们重构前面的示例,使其并发执行两个远程调用。

@RestController
class UserWithDetailsController(
		private val userRepository: UserRepository,
		private val client: WebClient) {

	@GetMapping("/")
	fun findAll(): Flow<UserWithDetails> =
		userRepository.findAll().map(this::withDetails)

	@GetMapping("/{id}")
	suspend fun findOne(@PathVariable id: String): UserWithDetails {
		val user: User = userRepository.findOne(id) ?:
			throw CustomException("This user does not exist")
		return withDetails(user)
	}

	private suspend fun withDetails(user: User): UserWithDetails = coroutineScope {
		val asyncDetail1 = async {
			client.get().uri("/userdetail1/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail1>()
		}
		val asyncDetail2 = async {
			client.get().uri("/userdetail2/${user.login}")
				.accept(APPLICATION_JSON)
				.awaitExchange().awaitBody<UserDetail2>()
		}
		UserWithDetails(user, asyncDetail1.await(), asyncDetail2.await())
	}
}

这里我们利用结构化并发,通过 async {} 构建器创建 Deferred<UserDetail1>Deferred<UserDetail2> 实例来触发两个用户详情的并行检索,然后通过调用两个 await() 方法等待它们完成,这些方法将在可用时返回 UserDetail1UserDetail2 实例。

结论

我认为将 Spring 响应式栈与协程和 Kotlin Flow API 结合使用,在命令式和声明式方法之间提供了一个有趣的权衡。它以一种非常易于理解的方式,让你能够利用 WebFlux 和 Spring Data 响应式的可伸缩性和特性。

Spring WebFlux 和 Spring Data 中的协程支持将作为即将发布的 Spring Boot 2.2 版本的一部分提供。你可以阅读参考文档,并期待进一步的改进,例如对 RSocket @MessageMapping 端点和 RSocketRequester 扩展的协程支持。Spring Data Moore 也将在 Spring Data MongoDB、Cassandra 和 Redis 上提供类似的协程扩展。并且 Spring Data 可能会在某个时候提供对协程仓库的支持。我们还将使 Reactor 和协程上下文互操作,以支持安全和响应式事务。

最后,我想感谢许多才华横溢的工程师,没有他们,这一切都不可能实现

  • 来自 Kotlin 团队的 Roman Elizarov 和 Vsevolod Tolstopyatov,感谢他们在协程和 Flow 方面令人难以置信的工作

  • Konrad Kaminski,感谢他最初由社区驱动的 Spring 协程支持工作

  • Jake Wharton,感谢他早期围绕统一 Rx 和协程进行的原型设计

  • Stéphane Maldini 和 David Karnok,感谢他们鼓舞人心的工作

  • Juergen Hoeller, Rossen Stoyanchev 和 Brian Dussault,感谢他们的信任

  • Mark Paluch 和 Oliver Drotbohm,感谢他们在持久化方面的支持

像往常一样,我期待收到反馈,以及 Kotlin 团队关于 Flow API 的反馈,因为它仍处于预览模式。来参加我在 Devoxx FranceJAXSpring I/OSunny Tech 的即将到来的演讲,了解更多信息。

干杯!

获取 Spring 通讯

订阅 Spring 通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助你加速前进。

了解更多

获得支持

Tanzu Spring 通过一个简单的订阅,提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部