云事件和 Spring - 第二部分

工程 | Oleg Zhurakousky | 2020年12月23日 | ...

引言

我们先快速总结一下上一篇文章

  • Message 是一个足够完善的结构和抽象,用于在 Spring 的上下文中使用表示云事件的数据。我们希望这一点很清楚。
  • 在 Spring 中,我们致力于将功能性问题与非功能性问题隔离,这使我们能够在框架级别处理非功能性方面(例如发送、接收、重试、连接、转换等),让您(大部分情况下)专注于实际的业务逻辑,并使您的代码保持简单且可插入各种 *执行上下文*(稍后详细介绍)。

业务问题

正如承诺的那样,这篇文章更偏技术性,因为它涵盖了您可以尝试的具体示例。因此,事不宜迟,我们首先描述将要介绍的三个用例。实际上用例相同,但执行上下文不同。

"接收表示待聘人员的数据,生成员工记录。"

三个不同的变体在于执行上下文(典型非功能性问题的示例)。

  • HTTP 请求/响应
  • 从 AMQP 到 Apache Kafka
  • 从 RSocket 到 Apache Kafka。

用例和执行上下文都不是真正的新颖或独特的。在 Spring 中,我们已经处理了数十年,数千个应用程序在生产环境中运行。那么,添加云事件上下文会有什么变化呢?换句话说,如果输入和输出数据表示云事件,会有什么变化呢?这些是我们在这篇文章中试图回答的问题。

这些示例的用户代码是:

@SpringBootApplication
public static class SampleApplication
  public static void main(String[] args) throws Exception {
    SpringApplication.run(SampleApplication.class, args);
  }

  @Bean
  public Function<Person, Employee> hire() {
    return person -> {
	Employee employee = new Employee(person);
	return employee;
    };
  }
}

是的,它有点枯燥,因为它没有显示任何非功能性方面,因为这些方面是由特定于执行上下文的框架处理的。我们还使函数的实现细节相当简单,因为它们与主题无关。框架并不真正关心您做什么。它只关心您期望什么——*输入*——以及您产生的什么——*输出*——而这些信息可从签名中获得。

用例 1(通过 HTTP)

此示例的完整源代码可在Spring Cloud Function 示例中找到。在其中,我们将云事件作为 HTTP 请求发送,并期望接收云事件作为 HTTP 响应。这意味着我们的 `hire()` 函数需要以某种方式成为 HTTP 端点。我们可以使用Spring Cloud Function框架来实现这一点。通过添加其 `spring-cloud-function-web` 依赖项,我们添加了 Spring Boot 自动配置和将我们的函数转换为 HTTP 端点所需的组件。配置选项和默认值不在本文的讨论范围之内,但您可以在Spring Cloud Function 文档的相关部分中找到它们。重要的是,根据这些默认值,函数的名称成为在 `localhost` 端口 `8080` 上运行的 URL 路径的一部分,从而形成 `https://127.0.0.1:8080/hire` 端点。

现在您可以启动应用程序并向其发布请求。应用程序运行后,您可以使用以下命令对其进行 `curl`:

curl -w'\n' localhost:8080/hire \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

您应该收到以下响应:

. . .
{"person":{"firstName":"John","lastName":"Doe"},"id":172,"message":"Employee 172 was hired on 17-12-2020"}

嗯……这与云事件真的没有关系!对吧……?

正确,但是框架将函数公开为 REST 端点、处理类型转换、调用和其他非功能性方面,这很明确,并且与云事件直接相关。继续读下去……

这种支持的核心是Message——一种结构和类型,它允许传入的 HTTP(或任何其他)请求采用规范形式,以便其他框架能够以统一的方式处理其内容,而不管其来源或目标。

但是,云事件呢?

让我们通过添加表示所需云事件属性的 HTTP 标头将此 HTTP 请求转换为云事件。请注意,这些标头以云事件规范的HTTP 协议绑定部分要求的 `ce-` 前缀为前缀。

curl -w'\n' localhost:8080/hire \
 -H "ce-id: 0001" \
 -H "ce-specversion: 1.0" \
 -H "ce-type: hire" \
 -H "ce-source: spring.io/spring-event" \
 -H "Content-Type: application/json" \
 -d '{"firstName":"John", "lastName":"Doe"}' -i

执行后,您不会看到任何区别。您的函数以相同的方式运行,并且您会收到相同的响应。

当然,直到您查看和分析响应标头,其中现在包含所需的云事件属性(尽管与请求中的属性不同)。

ce-source: https://springjava.cn/cloudevent
ce-specversion: 1.0
ce-type: sample
ce-id: 76208faf-f8e5-4267-9028-bb4392d66765
message-type: cloudevent
timestamp: 1608211771624
Content-Type: application/json
Transfer-Encoding: chunked
Date: Thu, 17 Dec 2020 13:29:31 GMT
{"person":{"firstName":"John","lastName":"Doe"},"id":171,"message":"Employee 171 was hired on 17-12-2020"}

但是怎么做到的呢?

这是我们再次提醒您我们致力于将非功能性方面外包给框架的部分,因为这是其中之一。因此,默认情况下(由框架建立),我们假设如果请求是云事件,则响应也应为云事件。您还可以看到,四个必需的云事件属性具有值,这些值也是根据框架建立的某些默认规则生成的。`specversion` 默认值为 `1.0`,`type` 为返回对象的类型名称,`id` 为生成的 `UUID`(提供相当安全的唯一性预期),`source` 为 `https://springjava.cn/`。

但是我不喜欢默认值。我想要我自己的值,并且想要添加其他属性?

正如我们在上一篇文章中提到的那样:“我们还公开了实用程序、库和配置选项,让您可以影响某些非功能性问题,因为出于各种原因,这可能仍然是必需的。”在这里,您有两个选项。* **第一个选项:**您可以更改函数签名并返回 `Message`,您可以在其中添加其他元数据(即,云事件属性)。一旦框架看到您返回了一个 `Message`,它就不会尝试对用户添加的元数据执行任何额外操作。这实际上适用于大多数(如果不是全部)依赖 Spring Messaging 的框架的规则。虽然此选项很简单,但它确实将非功能性方面泄漏到您的业务逻辑中。毕竟,您需要创建一个 `Message` 实例,您需要添加表示云事件属性的标头(最好使用正确的——规范规定的——属性前缀),等等。但是,此选项的最大缺陷是它需要您更改函数的签名并将功能性方面和非功能性方面混合在一起,这显然违反了*关注点分离*规则。但是,为了论证起见,以下是您将如何操作:

@Bean
public Function<Message<Person>, Message<Employee>> hire() {
  return message -> {
    Person person = message.getPayload();
    Employee employee = new Employee(person);
      return CloudEventMessageBuilder.withData(employee).setId("123456")
	.setSource(URI.create("https://spring.cloudevenets.sample")).build();
  };
}

示例源代码包含其注释版本。

* **第二个选项:**您可以提供名为 `CloudEventHeaderEnricher` 的策略的实现,该策略提供了一个单独的位置,您可以在其中实现为输出生成适当属性和标头的逻辑。此策略由框架在生成输出 `Message` 时调用。以下示例显示了此策略的可能实现(在示例中也被注释掉,因此取消注释它,重新启动应用程序,然后查看区别)。

@Bean
public CloudEventHeaderEnricher cloudEventEnricher() {
  return messageBuilder -> messageBuilder.setSource("https://springjava.cn/cloudevent")
	.setType("sample").setId("987654");
}

在这里,您还可以看到一个可以帮助您构建 Cloud Event 消息的实用程序类:CloudEventMessageBuilder。它的模型是根据标准 Spring MessageBuilder,但具有 Cloud Event 特定的 setter。但是,这种方法的主要优势在于关注点分离。您的业务逻辑(您的功能代码)保持简洁。此外,您仍然需要编写的非功能代码也编写在单独的地方。

还有一件事……示例代码假设您只对 Cloud Event 的data部分感兴趣,并且希望它以 POJO 的形式出现。但如果不是这种情况呢?如果您想要 Cloud Event 中的整个视图怎么办?或者如果您还想以原始形式(即byte[])获取 Cloud Event 数据怎么办?如前所述,框架从函数的签名中获取其指令。因此,通过将输入和输出类型声明为Message,您实际上是在指示框架为您提供整个 Cloud Event(而不仅仅是其data)。此外,通过指定Message的泛型类型,您可以指示框架将 Cloud Event 的data部分作为该 Java 类型提供服务,从本质上要求它在必要时执行类型转换。因此,您可以尝试以下签名:public Function<Message<byte[]>, Message<Employee>> hire() {...}public Function<byte[], Employee> hire() {...} 或其他。

目前就这些了。README 文件和源代码中的注释在需要时也会提供其他说明。

用例 2(从 AMQP 到 Kafka)

示例的完整源代码可在Spring Cloud Function 示例中找到。它假设您已具备一定的 AMQP 和 Apache Kafka 知识。在此示例中,我们使用 RabbitMQ(作为 AMQP 消息代理)和 Apache Kafka。

虽然此用例似乎比前一个用例更复杂,但本节和后续部分(第三个用例)出奇地简短。这是因为上一节中解释的所有内容也适用于此处。事实上,我们在这里唯一更改的是执行上下文。我们通过相同的机制来做到这一点:添加基于 Spring Boot 的相关自动配置。因此,在这种情况下,我们添加了两种自动配置:一种用于 RabbitMQ(AMQP 消息代理)绑定器,另一种用于 Spring Cloud Stream 框架中提供的 Apache Kafka 绑定器。还有一些额外的应用程序配置(您可以在application.properties文件中看到),用于指示框架如何将hire函数的输入端绑定到 RabbitMQ(通过 RabbitMQ 绑定器),并将输出端绑定到 Apache Kafka(通过 Apache Kafka 绑定器)。

假设您已运行 RabbitMQ 和 Kafka,请启动应用程序并将消息发送到 RabbitMQ。您可以使用RabbitMQ 仪表板(如果您已安装)并将消息发送到hire-in-0交换机。
为了符合 Cloud Event 规范,您应该使用 AMQP 适当的前缀(即cloudEvents:)提供属性。

cloudEvents:specversion=1.0
cloudEvents:type=hire
cloudEvents:source:spring.io/spring-event
cloudEvents:id=0001

然后考虑以下数据:{"firstName":"John", "lastName":"Doe"}

为了简化此演示部分,我们包含了一个测试用例,通过将 Cloud Event 发送到 RabbitMQ 并从 Apache Kafka 接收 Cloud Event 来有效地自动化此演示。

Message<byte[]> messageToAMQP = CloudEventMessageBuilder
	.withData("{\"firstName\":\"John\", \"lastName\":\"Doe\"}".getBytes())
	.setSource("https://cloudevent.demo")
	.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
	.build(CloudEventMessageUtils.AMQP_ATTR_PREFIX);

rabbitTemplate.send("hire-in-0", "#", messageToAMQP);
Message<String> resultFromKafka = queue.poll(2000, TimeUnit.MILLISECONDS);
System.out.println("Result Message: " + resultFromKafka);
. . .

请注意,我们在这里如何使用CloudEventMessageBuilder仅将source设置为 Cloud Event 属性,同时依赖于其余必需 Cloud Event 属性的默认值。我们还使用build(CloudEventMessageUtils.AMQP_ATTR_PREFIX)来确保属性以cloudEvents:前缀为前缀(请参阅Cloud Events AMQP 协议绑定)。另外,请注意,在接收端,Cloud Events 属性现在以ce_前缀为前缀(请参阅Cloud Events Kafka 协议绑定),因为它是由框架确定的目标目的地是 Apache Kafka。最后一点值得详细说明一下。我们已经确定设置 Cloud Event 属性是非功能性方面,并且由于它,我们公开了一种机制,让您可以在业务逻辑之外处理它。但是属性前缀呢?请注意,我们在不同的执行上下文中运行相同的代码。这意味着属性前缀实际上取决于执行上下文。因此,通过了解执行上下文,框架确保了 Cloud Event 属性前缀的正确性。

在这里,我们依赖于Spring Cloud Stream框架及其默认值,例如目标自动配置(Kafka 和 Rabbit)、绑定名称、连接性等等。这些默认值和配置选项的详细信息不在本文讨论范围之内,因为它们与 Cloud Events 无关。有关框架本身及其配置选项的更多详细信息,请参阅Spring Cloud Stream 文档

此外,与之前的示例一样,此示例还包含带注释的变体,欢迎您尝试。

用例 3(从 RSocket 到 Kafka)

示例的完整源代码可在Spring Cloud Function 示例中找到。它假设您已具备一定的 RSocket 和 Apache Kafka 知识。本节应该比上一节更短,因为它非常相似。但是,这里有一些有趣的变体值得讨论。显而易见的是RSocket。我们正在引入一种不同的交付机制。但真正让它更有趣的是,RSocket 没有定义协议绑定。我们可以选择遵守 Kafka、HTTP 或 AMQP 规范之一,或者我们可以以结构化模式通信 Cloud Event,其中整个事件被编码到某种结构(例如 JSON)中。

此示例中的一些实现细节也与其他用例不同。但是,这些细节与 Cloud Event 无关。相反,它们是您可以使用的其他机制的演示。例如,我们使用Consumer而不是Function,并使用 Spring Cloud Stream 框架提供的StreamBridge组件手动发送输出消息。

因此,事不宜迟,这是我们的应用程序代码。

@Bean
public Consumer<Person> hire(StreamBridge streamBridge) {
  return person -> {
    Employee employee = new Employee(person);
    streamBridge.send("hire-out-0", CloudEventMessageBuilder.withData(employee)
	.setSource("https://springjava.cn/rsocket")
	.setId("1234567890")
	.build());
  };
}

请注意,我们如何使用CloudEventMessageBuilder生成输出Message作为 Cloud Event。

我们将 Cloud Event 的结构化表示形式(编码为 JSON)通过 RSocket 发送到hire()函数。

String payload = "{\n" +
	"    \"specversion\" : \"1.0\",\n" +
	"    \"type\" : \"org.springframework\",\n" +
	"    \"source\" : \"https://springjava.cn/\",\n" +
	"    \"id\" : \"A234-1234-1234\",\n" +
	"    \"datacontenttype\" : \"application/json\",\n" +
	"    \"data\" : {\n" +
	"        \"firstName\" : \"John\",\n" +
	"        \"lastName\" : \"Doe\"\n" +
	"    }\n" +
	"}";

rsocketRequesterBuilder.tcp("localhost", 55555)
	.route("hire")        // target function
	.data(payload).       // data we're sending
	.send()

由于目标目的地相同,因此预期输出应类似于之前的用例。

结论

如您所见,在 Spring 的上下文中处理 Cloud Events 时,您可以选择多种方式。

  • 您可以选择只关心 Cloud Event 的内容,同时保持对出站 Cloud Event 外观的完全控制。
  • 您可以选择通过Message处理 Cloud Event 本身,并依赖提供的实用程序来简化对 Cloud Event 特定数据的访问。
  • 您可以选择执行上下文,而不会影响您的业务逻辑(用户代码),同时委托给框架以确保某些 Cloud Event 特定内容(例如属性前缀)的正确性。

这些只是与本文上下文相关的少数几种方式,但还有更多。

已建立和验证的模式、实现这些模式的框架以及分层和有见地的 Spring Boot 自动配置使这一切成为可能。层很重要,因为它们允许您将问题分解成一个可以在其他项目和存在相同问题的集成中重复使用的解决方案。这有效地使当前的 Cloud Event 集成成为一项简单的努力,因为与 Cloud Event 无关的大多数非功能性方面(即连接、发送、接收、转换、重试等)已经被 Spring Cloud Function 和 Spring Cloud Stream 背后的各个框架解决了。

最后但并非最不重要的是,还有一种处理 Cloud Events 和 Spring 的替代方法,即通过Cloud Events Java SDK,您也可以在那里找到一个示例

获取 Spring 新闻通讯

关注 Spring 最新资讯

订阅

抢先一步

VMware 提供培训和认证,助您快速提升技能。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举办的活动

查看 Spring 社区所有即将举办的活动。

查看全部