领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我们首先快速总结 上一篇文章。
正如承诺的那样,这篇博文更具技术性,因为它涵盖了可供您尝试的具体示例。因此,事不宜迟,我们首先描述我们将涵盖的三个用例。实际上用例是相同的,但执行上下文有所不同。
“接收代表待聘人员的数据,生成员工记录。”
这三种不同的变体在于执行上下文(典型的非功能性关注点的一个例子)
用例和执行上下文都不是真正新的或独一无二的。在 Spring 中,我们已经处理它们几十年了,有数千个应用程序在生产中运行。那么,通过添加 Cloud Event 上下文会有什么变化吗?换句话说,如果传入和传出数据代表一个 Cloud Event,会有什么变化吗?这些是我们在本文中试图回答的问题。
这些示例的用户代码是
@SpringBootApplication
public static class SampleApplication
public static void main(String[] args) throws Exception {
SpringApplication.run(SampleApplication.class, args);
}
@Bean
public Function<Person, Employee> hire() {
return person -> {
Employee employee = new Employee(person);
return employee;
};
}
}
是的,这有点无聊,因为它没有显示任何非功能性方面,因为它们是由特定于执行上下文的框架处理的。我们还将函数的实现细节保持得相当简单,因为它们与主题无关。框架并不真正关心你做什么。它只关心你期望什么——输入——以及你产生什么——输出——这些信息可以从签名中获得。
此示例的完整源代码可在 Spring Cloud Function 示例中找到。在此示例中,我们发送一个 Cloud Event 作为 HTTP 请求,并期望接收一个 Cloud Event 作为 HTTP 响应。这意味着,我们的 hire() 函数需要以某种方式成为一个 HTTP 端点。我们可以通过使用 Spring Cloud Function 框架来实现这一点。通过添加其 spring-cloud-function-web 依赖项,我们添加了将函数转换为 HTTP 端点所需的 Spring Boot 自动配置和组件。配置选项和默认值超出了本文的范围,但您可以从 Spring Cloud Function 文档的相关部分获取它们。重要的是,基于这些默认值,函数名称成为在 localhost 端口 8080 上运行的 URL 路径的一部分,从而生成 https://:8080/hire 端点。
现在您可以启动应用程序并对其进行发布。应用程序运行后,您可以使用以下命令对其进行 curl
curl -w'\n' localhost:8080/hire \
-H "Content-Type: application/json" \
-d '{"firstName":"John", "lastName":"Doe"}' -i
您应该收到以下响应
. . .
{"person":{"firstName":"John","lastName":"Doe"},"id":172,"message":"Employee 172 was hired on 17-12-2020"}
嗯。。。。这真的和 Cloud Events 没什么关系!对吧……?
没错,但框架将函数暴露为 REST 端点、处理类型转换、调用和其他非功能性方面的能力是明确的,并且与 Cloud Events 直接相关。请继续阅读……
这种功能的实现核心是 Message——一种结构和类型,它允许传入的 HTTP(或任何其他)请求采用规范形式,以便其他框架能够以统一的方式处理其内容,无论其来源或目的地如何。
但是等等,Cloud Events 呢?
让我们通过添加代表所需 Cloud Event 属性的 HTTP 头,将此 HTTP 请求转换为 Cloud Event。请注意,这些头带有 ce- 前缀,这是 Cloud Event 规范的 HTTP 协议绑定部分所要求的。
curl -w'\n' localhost:8080/hire \
-H "ce-id: 0001" \
-H "ce-specversion: 1.0" \
-H "ce-type: hire" \
-H "ce-source: spring.io/spring-event" \
-H "Content-Type: application/json" \
-d '{"firstName":"John", "lastName":"Doe"}' -i
执行后,您不会看到任何差异。您的函数以相同的方式运行,并且您收到相同的响应。
当然,除非您查看并分析响应头,其中现在包含所需的 Cloud Event 属性(尽管与请求中的属性不同)
ce-source: https://springjava.cn/cloudevent
ce-specversion: 1.0
ce-type: sample
ce-id: 76208faf-f8e5-4267-9028-bb4392d66765
message-type: cloudevent
timestamp: 1608211771624
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 17 Dec 2020 13:29:31 GMT
{"person":{"firstName":"John","lastName":"Doe"},"id":171,"message":"Employee 171 was hired on 17-12-2020"}
但是怎么回事?
这是我们再次提醒您我们承诺将非功能性方面外包给框架的部分,因为这也是其中之一。因此,默认情况下(由框架建立),我们假设如果请求是一个 Cloud Event,则响应也应是一个 Cloud Event。您还可以看到,四个必需的 Cloud Event 属性的值也是通过遵循框架建立的某些默认规则生成的。specversion 默认为 1.0,type 默认为返回对象的类型名称,id 默认为生成的 UUID(以提供合理安全的唯一性期望),source 默认为 https://springjava.cn/。
但我不喜欢默认值。我想要自己的,并且我想要添加额外的属性?
正如我们在上一篇文章中提到的:“我们还公开了实用程序、库和配置选项,让您可以影响某些非功能性关注点,因为出于各种原因,这仍然可能是必需的。”在这里,您有两种选择。第一个选项:您可以更改函数签名并返回一个Message<Employee>,您可以在其中添加额外的元数据(即 Cloud Event 属性)。一旦框架看到您返回了一个Message,它就不会尝试对用户添加的元数据做任何额外的事情。这是适用于大多数(如果不是所有)依赖 Spring Messaging 的框架的规则。虽然这个选项很简单,但它确实会将非功能性方面泄露到您的业务逻辑中。毕竟,您需要创建一个Message实例,您需要添加代表 Cloud Event 属性的头(最好带有正确的——规范强制的——属性前缀),等等。但这个选项最大的缺陷是它会要求您更改函数的签名并将功能和非功能性方面混合在一起,这明显违反了关注点分离规则。然而,为了论证,这里是您如何做到这一点的方法
@Bean
public Function<Message<Person>, Message<Employee>> hire() {
return message -> {
Person person = message.getPayload();
Employee employee = new Employee(person);
return CloudEventMessageBuilder.withData(employee).setId("123456")
.setSource(URI.create("https://spring.cloudevenets.sample")).build();
};
}
示例源代码包含其注释版本。
第二个选项:您可以提供一个名为 CloudEventHeaderEnricher 的策略实现,它提供了一个单独的地方,您可以在其中实现生成适当属性和输出头的逻辑。此策略由框架在生成输出 Message 时调用。以下示例显示了此策略的一种可能实现(在示例中也已注释掉,因此取消注释,重新启动应用程序,然后查看差异)。
@Bean
public CloudEventHeaderEnricher cloudEventEnricher() {
return messageBuilder -> messageBuilder.setSource("https://springjava.cn/cloudevent")
.setType("sample").setId("987654");
}
在这里,您还可以看到一个可以帮助您构建 Cloud Event 消息的实用类:CloudEventMessageBuilder。它仿照标准的 Spring MessageBuilder,但具有 Cloud Event 特定的设置器。然而,这种方法的主要优点是关注点分离。您的业务逻辑(您的功能代码)保持干净。此外,您仍然需要编写的非功能代码是写在单独的位置。
还有一件事。。。示例代码假定您只对 Cloud Event 的 data 部分感兴趣,并且您希望它以 POJO 的形式出现。但如果情况并非如此呢?如果您想要 Cloud Event 的完整视图呢?或者如果您还想要原始形式(即 byte[])的 Cloud Event 数据呢?如前所述,框架从函数的签名中获取指令。因此,通过将输入和输出类型声明为 Message,您实际上是在指示框架为您提供整个 Cloud Event(而不仅仅是它的 data)。此外,通过指定 Message 的泛型类型,您指示框架将 Cloud Event 的 data 部分作为该 Java 类型提供,实质上是要求它在必要时执行类型转换。所以请尝试以下签名:public Function<Message<byte[]>, Message<Employee>> hire() {...} 或 public Function<byte[], Employee> hire() {...} 或其他。
目前就这些了。README 文件和源代码中的注释也提供了必要的额外说明。
该示例的完整源代码可在 Spring Cloud Function 示例中找到。它假设您对 AMQP 和 Apache Kafka 有一定程度的熟悉。在此示例中,我们使用 RabbitMQ(作为 AMQP 消息代理)和 Apache Kafka。
虽然这个用例可能看起来比前一个更复杂,但本节和后续部分(第三个用例)却出奇地短。那是因为上一节中解释的一切也适用于这里。实际上,我们在这里改变的唯一是执行上下文。我们通过相同的机制来做到这一点:添加相关的基于 Spring Boot 的自动配置。因此,在这种情况下,我们添加了两个自动配置:一个用于 RabbitMQ(AMQP 消息代理)绑定器,一个用于 Spring Cloud Stream 框架中可用的 Apache Kafka 绑定器。还有一些额外的应用程序配置(您可以在 application.properties 文件中看到),用于指示框架如何将 hire 函数的输入端绑定到 RabbitMQ(通过 RabbitMQ 绑定器)以及输出端绑定到 Apache Kafka(通过 Apache Kafka 绑定器)。
假设您已运行 RabbitMQ 和 Kafka,请启动应用程序并向 RabbitMQ 发送一条消息。您可以使用 RabbitMQ 仪表板(如果已安装)向 hire-in-0 交换机发送消息。
为了符合 Cloud Event 规范,您应该提供带有 AMQP 适当前缀(即 cloudEvents:)的属性。请考虑以下示例
cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001
然后考虑以下数据:{"firstName":"John", "lastName":"Doe"}
为了简化这个演示部分,我们包含了一个测试用例,通过向 RabbitMQ 发送一个 Cloud Event 并从 Apache Kafka 接收一个 Cloud Event 来有效地自动化这个演示。
Message<byte[]> messageToAMQP = CloudEventMessageBuilder
.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
.setSource("https://cloudevent.demo")
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);
rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .
请注意,我们如何在这里使用 CloudEventMessageBuilder 仅将 source 设置为 Cloud Event 属性,同时依赖其余必需的 Cloud Event 属性的默认值。我们还使用 build(CloudEventMessageUtils.AMQP_ATTR_PREFIX) 来确保属性以 cloudEvents: 前缀开头(参见 Cloud Events AMQP 协议绑定)。此外,请注意,在接收端,Cloud Events 属性现在以 ce_ 前缀开头(参见 Cloud Events Kafka 协议绑定),因为框架已确定目标目的地是 Apache Kafka。最后一点值得稍微详细说明一下。我们已经确定设置 Cloud Event 属性是一个非功能性方面,因此,我们已经公开了一种机制,让您可以在业务逻辑之外处理它。但是属性前缀呢?请注意,我们正在不同的执行上下文中运行相同的代码。这意味着属性前缀实际上取决于执行上下文。因此,通过了解执行上下文,框架可以确保 Cloud Event 属性前缀的正确性。
在这里,我们依赖于 Spring Cloud Stream 框架及其默认值,例如目的地自动配置(Kafka 和 Rabbit)、绑定名称、连接等。这些默认值和配置选项的详细信息超出了本文的范围,因为它们都与 Cloud Events 无关。有关框架本身及其配置选项的更多详细信息,请参阅 Spring Cloud Stream 文档。
此外,与前面的示例一样,这个示例也包含注释掉的变体,欢迎您进行试验。
该示例的完整源代码可在 Spring Cloud Function 示例中找到。它假定您对 RSocket 和 Apache Kafka 有一定程度的熟悉。本节应该比上一节更短,因为它非常相似。然而,这里有一些有趣的变体值得讨论。显而易见的一个是 RSocket。我们引入了一种不同的交付机制。但真正使其更有趣的是,RSocket 没有定义协议绑定。我们可以选择遵守 Kafka、HTTP 或 AMQP 规范之一,或者我们可以以结构化模式通信 Cloud Event,其中整个事件被编码为某种结构(例如 JSON)。
在此示例中,一些实现细节也与其他用例不同。然而,这些细节与 Cloud Event 没有任何关系。相反,它们是您可以使用的其他机制的演示。例如,我们使用 Consumer 而不是 Function,并使用 Spring Cloud Stream 框架提供的 StreamBridge 组件手动发送输出消息。
因此,事不宜迟,这是我们的应用程序代码
@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
return person -> {
Employee employee = new Employee(person);
streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
.setSource("https://springjava.cn/rsocket")
.setId("1234567890")
.build());
};
}
请注意我们如何使用 CloudEventMessageBuilder 将输出 Message 生成为 Cloud Event。
我们通过 RSocket 将结构化的 Cloud Event 表示(编码为 JSON)发送到 hire() 函数
String payload = "{\n" +
" \"specversion\" : \"1.0\",\n" +
" \"type\" : \"org.springframework\",\n" +
" \"source\" : \"https://springjava.cn/\",\n" +
" \"id\" : \"A234-1234-1234\",\n" +
" \"datacontenttype\" : \"application/json\",\n" +
" \"data\" : {\n" +
" \"firstName\" : \"John\",\n" +
" \"lastName\" : \"Doe\"\n" +
" }\n" +
"}";
rsocketRequesterBuilder.tcp("localhost", 55555)
.route("hire") // target function
.data(payload). // data we're sending
.send()
预期的输出应与之前的用例相似,因为目标目的地相同。
正如你所看到的,在 Spring 的上下文中处理 Cloud Events 时,您有多种选择
Message 处理 Cloud Event 本身,并依赖提供的实用程序来简化对 Cloud Event 特定数据的访问。这些只是与本文相关的少数几个,但还有更多。
成熟且经过验证的模式、实现这些模式的框架以及分层和有主见的 Spring Boot 自动配置使其成为可能。层次非常重要,因为它们允许您将问题分解为可以在存在相同问题的其他项目和集成中重复使用的解决方案。这有效地使当前的 Cloud Event 集成成为一项简单的努力,因为大多数与 Cloud Event 无关的非功能性方面(即连接、发送、接收、转换、重试等)都已经由 Spring Cloud Function 和 Spring Cloud Stream 背后的各个框架解决。
最后但并非最不重要的一点是,还有另一种处理 Cloud Events 和 Spring 的方法,那就是通过 Cloud Events Java SDK,您可以在其中找到一个 示例。