Spring Cloud Stream - 事件路由

工程 | Oleg Zhurakousky | 2019年10月31日 | ...

欢迎阅读本系列文章的另一篇,该系列文章展示了Spring Cloud Stream (SCSt) 的新功能。在之前的文章(可在此处查看查看查看)中,我们试图为我们转向Spring Cloud Stream (SCSt) 中的函数式编程模型提供理由。它减少了代码和配置,并且您的代码与SCSt的内部完全解耦。

今天,我们将讨论使用函数进行路由。在SCSt的上下文中,路由是指能够*a) 将事件路由到特定的事件订阅者*或*b) 将事件订阅者生成的事件路由到特定的目的地*。为了更好地理解上下文,让我们快速了解基于注解的编程模型的工作方式。在本篇文章中,我们将将其称为“路由到”和“路由自”。

对于将事件路由* **到** *事件订阅者,我们使用`StreamListener`注解的`condition`属性,如下所示

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='order'")
public void receiveOrders(Order order) {...}

此处是关于此方法的更多详细信息。

并且,对于将事件路由* **自** *事件订阅者,我们使用动态绑定目标 - 允许框架根据单个事件中提供的某些指令绑定到目标的方法。

使用函数进行事件路由

使用函数式方法,我们可以通过一些附加功能以更简洁明了的方式完成上述所有操作。

路由到

可以通过依赖于Spring Cloud Function (SCF) 中提供的路由函数功能来实现“路由到”功能。您可以通过设置`spring.cloud.stream.function.routing.enabled`属性显式启用路由,或者通过设置`spring.cloud.function.routing-expression`属性并使用Spring表达式语言(SpEL)提供路由指令来隐式启用路由。路由指令应导致要路由到的函数的定义,“路由到”。出于绑定的目的,路由目标的名称为`functionRouter-in-0`(参见`RoutingFunction.FUNCTION_NAME`和此处描述的绑定命名约定)。

当消息发送到此目标时,路由函数会尝试确定哪个实际函数需要处理此事件。它首先尝试访问`spring.cloud.function.routing-expression`消息头,如果提供,则确定要调用的实际函数的名称。这是最动态的方法。第二种最动态的方法是提供一个`spring.cloud.function.definition`头,它应该包含要路由到的函数的定义。“路由到”。这两种方法都需要通过设置`spring.cloud.stream.function.routing.enabled`属性来显式启用路由函数。

至于以前版本中不可用的附加功能,`spring.cloud.function.routing-expression`也可以用作应用程序属性。例如,考虑表达式无论传入事件如何都相同的情况,就像本文前面显示的基于注解的示例中那样(例如,`spring.cloud.function.routing-expression=headers['type']=='order'`)。对于此方法,您无需显式启用路由函数,因为`spring.cloud.function.routing-expression`作为应用程序属性具有相同的效果。

尽管微不足道,但以下是上述方法之一的完整示例

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}

通过将消息发送到绑定器(即RabbitMQ或Kafka)公开的`functionRouter-in-0`目标,此类消息将根据消息处理时的`nanoTime()`值路由到相应的(“偶数”或“奇数”)`Consumer` bean。

路由自

和以前一样,“路由自”依赖于SCSt的动态绑定目标功能。但是,与“路由到”一样,还有许多附加功能。

以下示例显示了基础知识

@Autowired
private BinderAwareChannelResolver resolver;

public Consumer<String> send(Message message) {   
     MessageChannel destination = resolver
        .resolveDestination(message.getHeaders().get("type"))
     Message outgoingMessage = . . . // your code
     destination.send(outgoingMessage);
}

您只需要一个对`BinderAwareChannelResolver`的引用(在上一个示例中自动装配)。然后,您可以使用一些逻辑来确定目标名称(在我们的例子中,我们使用“type”头的值)。一旦确定目标名称,您可以使用`BinderAwareChannelResolver.resolveDestination(..)`操作获取对它的引用,并将消息发送到它。这就是全部。

上述方法的缺点是某些框架特定的抽象泄漏到您的代码中。看看您需要了解`BinderAwareChannelResolver`和`MessageChannel`等等的事实。实际上,前面示例中的大部分代码都是样板代码。

一种更动态、更少泄漏的方法是依赖于`spring.cloud.stream.sendto.destination`属性,它实际上完成了上述所有操作 - 但在幕后。以下示例显示了如何使用此方法

@SpringBootApplication
public class RoutingStreamApplication {

  @Bean
  public Function<Message<String>, Message<String>> process() {
    return message -> {
      // some logic to process incoming message
      Message<String> outgoingMessage = MessageBuilder
		.withPayload("Hello")
		.setHeader("spring.cloud.stream.sendto.destination", "even")
		.build();
       return outgoingMessage;
     };
  }
}

我们不再需要注入`BinderAwareChannelResolver`,执行`MessageChannel`的分辨等。我们只需创建一个新的`Message`,它指定一个由框架用于动态解析目标的头。

路由源

最后但并非最不重要的一点是,让我们看一下“路由自”的另一个流行用例,其中数据源起源于SCSt上下文之外,但需要路由到适当的目标

@Controller
public class SourceWithDynamicDestination {
    @Autowired
    private ObjectMapper jsonMapper;

    private final EmitterProcessor<?> processor = EmitterProcessor.create();

    @RequestMapping(path = "/", method = POST, consumes = "*/*")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void handleRequest(@RequestBody String body, 
      @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) 
      throws Exception {
        Map<String, String> payload = jsonMapper.readValue(body, Map.class);
        String destination = payload.get("id");
        Message<?> message =
          MessageBuilder.withPayload(payload)
           .setHeader("spring.cloud.stream.sendto.destination", destination)
           .build();
        processor.onNext(message);
    }

    @Bean
    public Supplier<Flux<?>> source() {
        return () -> processor;
    }
}

然后我们可以通过运行以下`curl`命令查看结果

curl -H "Content-Type: application/json" -X POST -d '{"id":"customerId-1","bill-pay":"100"}' https://127.0.0.1:8080

在这里,我们使用函数式方法和少量响应式范式,通过`Supplier<Flux<?>>` bean。我们有一个简单的MVC控制器,我们希望根据内容的“id”属性的值将请求向下游路由。虽然`EmitterProcessor`及其在此处的用法是另一篇文章的主题,但重要的是它演示了一个完全功能的应用程序,其中HTTP请求被动态路由到目标绑定器管理的目标。

注意:在撰写本文时,参考文档正在积极更新以支持即将推出的SCSt 3.0.0.RELEASE版本,但您始终可以使用参考文档的源代码获取最新信息。

在GitHub上查看Spring Cloud Stream

此外,本系列之前的博客

- Spring Cloud Stream - 函数式和响应式

获取Spring新闻通讯

与Spring新闻通讯保持联系

订阅

领先一步

VMware提供培训和认证,以快速提升您的进度。

了解更多

获得支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区所有即将举行的活动。

查看全部