领先一步
VMware 提供培训和认证,助您加速进步。
了解更多欢迎阅读本系列文章的又一篇,本系列文章旨在展示 Spring Cloud Stream (SCSt) 的新特性。在之前的文章(可在此处、此处和此处查看)中,我们试图为 Spring Cloud Stream (SCSt) 转向函数式编程模型提供理由。它代码量更少,配置更少,并且您的代码与 SCSt 的内部实现完全解耦。
今天,我们将讨论函数路由。在 SCSt 的上下文中,路由是指:a) 将事件路由到特定的事件订阅者,或 b) 将事件订阅者产生的事件路由到特定的目标。为了更好地理解上下文,让我们快速回顾一下基于注解的编程模型的工作方式。在这篇文章中,我们将把它们称为“路由到”和“路由自”。
对于路由到事件订阅者,我们使用 StreamListener 注解的 condition 属性,如下所示:
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}
此处提供了有关此方法的更多详细信息。
对于从事件订阅者路由,我们使用了动态绑定目的地 - 这种方法允许框架根据单个事件中提供的某些指令绑定到目的地。
使用函数式方法,我们可以通过一些附加功能,以更简洁明了的方式完成上述所有操作。
“路由到”函数可以通过依赖 Spring Cloud Function (SCF) 中提供的路由函数功能来实现。您可以通过设置 spring.cloud.stream.function.routing.enabled 属性显式启用路由,或者通过设置 spring.cloud.function.routing-expression 属性并提供 Spring Expression Language (SpEL) 路由指令来隐式启用路由。路由指令应导致路由到的函数定义。出于绑定目的,路由目标的名称是 functionRouter-in-0(请参阅 RoutingFunction.FUNCTION_NAME 和此处描述的绑定命名约定)。
当消息被发送到此目标时,路由函数会尝试确定哪个实际函数需要处理此类事件。它首先尝试访问 `spring.cloud.function.routing-expression` 消息头,如果提供,则确定要调用的实际函数的名称。这是最动态的方法。第二动态的方法是提供 `spring.cloud.function.definition` 头,它应该包含要“路由到”的函数的定义。两种方法都需要通过设置 `spring.cloud.stream.function.routing.enabled` 属性来显式启用路由函数。
至于以前版本中未提供的附加功能,`spring.cloud.function.routing-expression` 也可以用作应用程序属性。例如,考虑当表达式与传入事件无关时的情况,如本文前面所示的基于注解的示例(例如,`spring.cloud.function.routing-expression=headers['type']=='order'`)。对于这种方法,您不需要显式启用路由函数,因为 `spring.cloud.function.routing-expression` 作为应用程序属性具有相同的效果。
尽管微不足道,以下是上述方法之一的完整示例
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
通过将消息发送到绑定器(即 rabbit 或 kafka)公开的 `functionRouter-in-0` 目标,该消息将根据消息处理时 `nanoTime()` 的值路由到适当的(“偶数”或“奇数”)`Consumer` bean。
与之前一样,"路由自"依赖于 SCSt 的动态绑定目的地功能。然而,与"路由到"一样,还有一些附加功能。
以下示例展示了基本用法
@Autowired
private BinderAwareChannelResolver resolver;
public Consumer<String> send(Message message) {
MessageChannel destination = resolver
.resolveDestination(message.getHeaders().get("type"))
Message outgoingMessage = . . . // your code
destination.send(outgoingMessage);
}
您只需要引用 `BinderAwareChannelResolver`(在前面的示例中自动装配)。然后,您可以使用一些逻辑来确定目标名称(在我们的示例中,我们使用“type”头的值)。一旦确定了目标名称,您可以通过 `BinderAwareChannelResolver.resolveDestination(..)` 操作获取对其的引用,并向其发送消息。这真的是全部。
上述方法的缺点是某些框架特定的抽象会渗透到您的代码中。例如,您需要了解 `BinderAwareChannelResolver` 和 `MessageChannel` 等等。实际上,前面示例中的大部分代码都是样板代码。
一种更动态且侵入性更小的方法是依赖 `spring.cloud.stream.sendto.destination` 属性,它有效地在幕后完成了上述所有操作。以下示例展示了如何使用这种方法
@SpringBootApplication
public class RoutingStreamApplication {
@Bean
public Function<Message<String>, Message<String>> process() {
return message -> {
// some logic to process incoming message
Message<String> outgoingMessage = MessageBuilder
.withPayload("Hello")
.setHeader("spring.cloud.stream.sendto.destination", "even")
.build();
return outgoingMessage;
};
}
}
我们不再需要注入 `BinderAwareChannelResolver`、执行 `MessageChannel` 的解析等等。我们只需创建一个新的 `Message`,它指定一个由框架用于动态解析目标的消息头。
最后但并非最不重要的是,让我们看看“路由自”的另一个流行用例,即数据源来自 SCSt 上下文之外,但需要路由到适当的目标。
@Controller
public class SourceWithDynamicDestination {
@Autowired
private ObjectMapper jsonMapper;
private final EmitterProcessor<?> processor = EmitterProcessor.create();
@RequestMapping(path = "/", method = POST, consumes = "*/*")
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody String body,
@RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType)
throws Exception {
Map<String, String> payload = jsonMapper.readValue(body, Map.class);
String destination = payload.get("id");
Message<?> message =
MessageBuilder.withPayload(payload)
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
processor.onNext(message);
}
@Bean
public Supplier<Flux<?>> source() {
return () -> processor;
}
}
然后我们可以通过运行以下 `curl` 命令查看结果
curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' https://:8080
在这里,我们通过 `Supplier
注意:在撰写本文时,参考文档正在积极更新以支持即将发布的 SCSt 3.0.0.RELEASE 版本,但您始终可以使用参考文档的来源来获取最新信息。
请在 GitHub 上查看 Spring Cloud Stream。
此外,本系列之前的博客文章有