提升自我
VMware 提供培训和认证,助您加速进步。
了解更多上周发布的 Spring Data Kay M1 是首个提供响应式数据访问支持的版本。它初步支持的存储——MongoDB、Apache Cassandra 和 Redis——都已提供了响应式驱动,这使得它们成为此类原型的理想候选。让我们更详细地了解构成该支持的新编程模型和 API。
仓库编程模型是 Spring Data 用户通常使用的最高级抽象。它通常由 Spring Data 提供的接口中定义的一组 CRUD 方法和领域特定查询方法组成。以下是一个响应式 Spring Data 仓库定义的示例
public interface ReactivePersonRepository
extends ReactiveCrudRepository<Person, String> {
Flux<Person> findByLastname(Mono<String> lastname);
@Query("{ 'firstname': ?0, 'lastname': ?1}")
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname);
}
正如您所见,它与您习惯的传统仓库接口差异不大。然而,与传统仓库接口相比,响应式仓库使用响应式类型作为返回类型,也可以用于参数类型。新引入的 `ReactiveCrudRepository` 中的 CRUD 方法自然也使用了这些类型。
默认情况下,响应式仓库使用 Project Reactor 类型,但也可以使用其他响应式库。我们为这些库提供了自定义仓库基础接口(例如 `RxJava2CrudRepository`),并根据查询方法需要自动调整类型,例如 RxJava 的 `Observable` 和 `Single`。其他方面基本保持不变。但请注意,当前里程碑版本尚不支持分页,并且您需要在类路径中包含必要的响应式库才能激活对特定库的支持。
与阻塞世界中的情况类似,响应式 Spring Data 的支持通过 `@Enable…` 注解以及一些基础设施设置来激活
@EnableReactiveMongoRepositories
public class AppConfig extends AbstractReactiveMongoConfiguration {
@Bean
public MongoClient mongoClient() {
return MongoClients.create();
}
@Override
protected String getDatabaseName() {
return "reactive";
}
}
请注意我们如何为基础设施配置使用不同的基类,因为我们需要利用 MongoDB 异步驱动。
现在可以像使用阻塞式仓库一样使用该仓库,不同之处在于结果的处理可以以响应式方式进行
@RestController
class PersonController {
private final PersonRepository people;
public PersonController(PersonRepository people) {
this.people = people;
}
@GetMapping("/people")
Flux<String> namesByLastname(@RequestParam Mono<String> lastname) {
Flux<Person> result = repository.findByLastname(lastname);
return result.map(it -> it.getFullName());
}
}
请看我们如何转发 Spring Web Reactive 提供的响应式参数,将它们传输到仓库中,然后获取一个 `Flux`,并以响应式方式处理执行结果。通常,响应式查询方法遵循与现有仓库相同的查询创建思路。传递给查询方法的参数可以是普通类型(例如 `String`)、包装类型(`Optional
正如传统仓库基于传统模板实现一样,响应式仓库构建在响应式模板之上。阻塞模板 API 中可用的大多数操作在响应式模板中都有对应的部分。我们将把阻塞世界中的更多功能移植到响应式模板 API 中,但有些操作目前(或尚未)无法通过响应式驱动程序实现,或者在响应式世界中没有意义。
以下是 Spring Data MongoDB 中 `ReactiveMongoOperations` 的一个片段。它由 `ReactiveMongoTemplate` 实现,并使用 Project Reactor 的响应式类型(如 `Mono` 和 `Flux`)来包装响应。有些方法也接受响应式类型,以便将数据流式传输到您的数据存储中。
public interface ReactiveMongoOperations {
// …
/**
* Map the results of an ad-hoc query on the specified collection to a
* single instance of an object of the specified type.
*/
<T> Mono<T> findOne(Query query, Class<T> entityClass);
/**
* Map the results of an ad-hoc query on the collection for the entity
* class to a List of the specified type.
*/
<T> Flux<T> find(Query query, Class<T> entityClass);
/**
* Insert the object into the specified collection.
*/
<T> Mono<T> insert(T objectToSave, String collectionName);
/**
* Insert the object into the collection for the entity type of the object
* to save.
*/
<T> Mono<T> insert(Mono<? extends T> objectToSave);
// …
}
请注意,所有方法都遵循响应式执行模型,即在调用时不会执行任何包含 I/O 的操作,只有在订阅返回值时才会执行。
让我们通过模板插入一些数据
Flux<Person> flux = Flux.just(new Person("Walter", "White"),
new Person("Skyler", "White"),
new Person("Saul", "Goodman"),
new Person("Jesse", "Pinkman"));
template.insertAll(flux).subscribe();
有些方法(例如 `insertAll(…)`)接受响应式类型,以便将传入数据异步流式传输到您的 MongoDB 数据库中,例如来自您在 Spring Web Reactive 控制器中接收到的 `Flux`,该 Flux 将通过 Jackson 异步映射一个 JSON 数组
@PostMapping("/people")
Flux<People> namesByLastname(@RequestBody Flux<Person> people) {
return template.insertAll(people);
}
正如您所见,仓库和模板 API 都允许您以响应式、非阻塞的方式描述请求处理。接下来,让我们更深入地了解 Redis 对响应式数据访问的支持。
Spring Data Redis 在连接层提供了初步的响应式支持,目前仅支持 Lettuce,因为它是唯一支持响应式数据访问的 Redis 驱动。由于 Redis 通常在较低的抽象级别使用,Kay M1 版本从该较低级别的响应式抽象开始。`LettuceConnectionFactory` 允许访问 `ReactiveRedisConnection`,后者又提供了 Redis 命令的响应式版本
使用操作符进行函数式链式调用创建访问 Redis 数据的响应式链。同样,所有 I/O 都是异步的。
ReactiveKeyCommands keyCommands = connection.keyCommands();
keyCommands.randomKey()
.flatMap(keyCommands::type)
.flatMap(System.out::println)
.subscribe();
这段代码获取一个随机键并打印其数据类型。一个不存在的随机键将以空的 `Mono` 完成。
响应式 Redis 命令有两种风格:接受普通参数和接受命令发布者。命令发布者发出特定的 Redis 命令,将数据直接流式传输到 Redis。每当命令执行后,每个发出的命令都会发出一个命令响应。
public interface ReactiveStringCommands {
// …
Mono<Boolean> set(ByteBuffer key, ByteBuffer value);
Flux<BooleanResponse<SetCommand>> set(Publisher<SetCommand> commands);
// …
}
传统的 Spring Data Redis 在其阻塞 API 上使用 `byte[]` 来交换数据。如果数据已存在于缓冲区中(如 `ByteBuffer` 或 Netty 的 `ByteBuf`),`byte[]` 会强制进行数据复制。响应式支持很大程度上是为了提高资源使用效率,因此我们决定公开接受和返回 `ByteBuffer` 的方法。
希望这篇博文为您介绍了 Kay 版本在不同抽象级别提供的响应式特性。您可以在我们的示例仓库中找到所有这些的可执行示例。
我们期待在 2017 年 1 月发布另一个里程碑版本,然后迈向发布候选版本。