领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多事件驱动架构非常棒。但是,如果没有框架,编写与流行事件消息平台交互所需的脚手架代码会很混乱。在这篇文章中,我们将了解如何使用Spring Cloud Stream来简化您的代码。
您只想编写事件驱动应用程序的逻辑,但是样板消息代码可能会妨碍您。将您的应用程序连接到消息服务很棘手,如果您是企业开发人员,则可能需要使用多种消息技术(本地或云端)。
让灵活的消息传递抽象来处理复杂的消息平台集成,以便您可以专注于编写简单干净的业务逻辑。Spring Cloud Stream 是一个很好的选择。它将许多流行的消息传递平台统一到一个易于使用的 API 后面,包括 RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs 和 Apache RocketMQ。它甚至消除了这些平台之间在方法和功能上的细微差异(例如分区或交换),让您可以自由地创建创新的事件驱动解决方案。
在接下来的演示中,您将确切地看到 Spring Cloud Stream 的巧妙抽象如何帮助使事件流代码更简洁、更易于使用。您还将看到使用 Spring Cloud Stream 的binding
库在两个不同的消息传递平台(RabbitMQ 或 Kafka)之间切换是多么容易。
这些事件驱动微服务需要在您的电脑上安装以下应用程序的最新版本1
首先,从 GitHub 克隆代码仓库。为此(如果您已安装 Git),打开一个新的终端窗口并执行以下命令。如果您没有安装 Git,请下载并解压缩此 zip 文件。
git clone https://github.com/benwilcock/spring-cloud-stream-demo.git
检查代码后,您会注意到此仓库包含两个微服务。
Loansource
微服务(位于/loansource
文件夹中)。此微服务充当事件消息的来源。这些事件是Loan
申请,类似于您在银行和金融领域看到的。每个贷款都有一个“名称”、一个“金额”和一个“状态”(最初设置为PENDING
)。
Loancheck
微服务(位于/loancheck
文件夹中)。此微服务充当Loan
处理器。它检查哪些贷款是好的贷款,并将它们分类为APPROVED
或DECLINED
状态。
要运行演示,请按照以下说明操作。
在一个新的终端窗口中,转到项目的根文件夹并执行以下命令。
您需要安装并运行系统上的"Docker"才能使此脚本正常工作,因为它需要
docker-compose
。
./start-servers.sh
此脚本将启动Kafka和RabbitMQ,并将这两个服务器的日志输出流式传输到终端窗口(除非您使用Ctrl-C
退出)。当您按下Ctrl-C
时,服务器不会停止 - 它们将在后台继续运行。启动后,这些服务器将对计算机上运行的应用程序可用。
在接下来的步骤 3 和步骤 4 中,我们必须将-P<profile-choice>
替换为我们想要使用的消息传递平台的名称。
-Pkafka
-Prabbit
如果您完全省略-P<profile-choice>
设置,则将使用 Kafka。
注意:此演示并非旨在在 Kafka 和 RabbitMQ 之间“桥接”消息,因此请确保在编译和运行这两个应用程序时在每个应用程序中选择相同的配置文件名称。如果您的目标是桥接消息系统,请参阅此处的文档。
在一个新的终端窗口中,使用cd
将/loansource
目录设置为当前目录,然后执行以下命令,将<profile-choice>
替换为您想要运行的模式(如上面步骤 2 中所述的kafka
或rabbit
模式)。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
loansource
应用程序启动后,在终端窗口中,您应该每秒钟看到一条消息,告诉您一个新的 Loan 事件已发布到PENDING
状态的消息平台中。保持此微服务运行并转到下一步。
在另一个新的终端窗口中,将/loancheck
目录设置为您的当前目录,然后执行以下命令,再次将<profile-choice>
替换为您想要运行的模式。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
loancheck
应用程序启动后,在终端窗口中,您应该每秒钟看到一条消息,告诉您一个新的PENDING
贷款申请已从消息平台读取并已APPROVED
或DECLINED
。如果您想了解这些应用程序是如何构建的,请跳至“工作原理”。
完成微服务操作后,在 /loansource
和 /loancheck
微服务的每个终端窗口中按下 Ctrl-C
。应用程序将停止运行,事件处理也将停止。
如果您要在 Kafka 和 Rabbit 之间切换模式,只需返回到**步骤 2**并重复该过程。
如果您已完全完成演示,并且还想停止 Kafka 和 RabbitMQ 服务器,请在项目根文件夹的终端窗口中运行
./stop-servers.sh
脚本。如果您只是在模式之间切换,则无需执行此操作。
Maven 配置文件(在每个项目的 pom.xml
中)控制构建时添加哪些 Spring Cloud Stream 绑定作为依赖项。当您选择 -Pkafka
时,[spring-cloud-stream-binder-kafka][kafka]
依赖项将添加到项目中。当您选择 -Prabbit
时,[spring-cloud-stream-binder-rabbit][rabbit]
依赖项将被添加。
<profiles>
<profile>
<id>kafka</id>
<properties>
<spring.profile.activated>kafka</spring.profile.activated>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
</profile>
...
<profiles>
您选择的 Maven 配置文件还会影响 src/main/resources/application.properties
文件中的 spring.profiles.active
属性,该属性会切换您在启动时看到的横幅。
对于 Loansource 微服务,我们正在使用 Spring Cloud Stream v2.1 的一项新功能 - Spring Cloud Function 支持。借助此新功能,要使 LoansourceApplication
微服务充当 Loan
消息的来源,只需声明一个生成并返回 Supplier<>
的 @Bean
方法即可。在本例中,它是类型为 Loan
的 Supplier
。函数方法代码如下所示...
@Bean
public Supplier<Loan> supplyLoan() {
return () -> {
Loan loan = new Loan(UUID.randomUUID().toString(), "Ben", 10000L);
LOG.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
return loan;
};
}
Supplier<>
是一种 Java 函数数据类型。由于只有一个返回此类型的 @Bean
方法,因此 Spring Cloud Stream 完全知道接下来该做什么。默认情况下,它将每秒触发一次此函数,并将结果发送到名为“output”的默认 MessageChannel
。此函数方法的优点在于它仅包含业务逻辑,因此您可以使用常规单元测试对其进行测试。
我们可以在
application.properties
文件中使用spring.cloud.function.definition
属性来明确声明要绑定到绑定目标的函数 bean - 但在您仅定义单个@Bean
的情况下,则无需执行此操作。
如果我们想使用不同的轮询间隔,可以在
application.properties
文件中使用spring.integration.poller.fixed-delay
属性。
loancheck
微服务需要稍微多一些代码,但不多。它的作用是将 Loan
事件分类到不同的通道中。为了做到这一点,它订阅来自源的 output
主题的事件,然后根据贷款的值(类似于欺诈检查设施)将它们发送到 approved
或 declined
主题。
因为我们使用 3 个消息通道(一个入站和两个出站),所以一个简单的 LoanProcessor
接口用于阐明输入和输出。目前,它看起来像这样
@Component
public interface LoanProcessor {
String APPLICATIONS_IN = "output"; // Topic where the new loans appear
String APPROVED_OUT = "approved"; // Topic where the approved loans are sent
String DECLINED_OUT = "declined"; // Topic where the declined loans are sent
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Output(APPROVED_OUT)
MessageChannel approved();
@Output(DECLINED_OUT)
MessageChannel declined();
}
此 LoanProcessor
接口首先在 @SpringBootApplication
类(LoanCheckApplication.java
)中作为 @EnableBinding()
注解的参数被引用,如下所示。
@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {
public static void main(String[] args) {
SpringApplication.run(LoanCheckApplication.class, args);
}
}
此外,在运行时使用此 LoanProcessor
构造名为 LoanChecker.java
的 Spring @Component
。此外,每当新的 Loan
事件到达时,都会自动调用此组件的 checkAndSortLoans(Loan)
方法,因为它已使用 @StreamListener()
注解到 LoanProcessor.APPLICATIONS_IN
通道。您可以在以下代码示例中看到此注解的使用。
@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Statuses.DECLINED.name());
processor.declined().send(message(loan));
} else {
loan.setStatus(Statuses.APPROVED.name());
processor.approved().send(message(loan));
}
}
然后,此方法使用简单的业务逻辑对 Loan
对象进行排序。根据排序结果,它将它们转发到 processor.approved()
通道或 processor.declined()
通道(在相应地设置其贷款状态后)。
如您所见,使用 Spring Cloud Streams 获得的关注点分离确实非常有效。在任何微服务中都没有绝对的 Kafka 或 RabbitMQ 特定代码。这使我们能够专注于业务逻辑,而不管消息传递平台如何,并且您可以简单地通过更改项目 pom.xml
中的“绑定器”依赖项来轻松地交换消息传递平台。
您可以按如下方式查看流经消息传递平台的事件
对于**Kafka**,可以使用 KafDrop 工具(位于 localhost:9000
)来观察主题和事件消息。无需登录。
对于**RabbitMQ**,可以在 localhost:15672
找到 Rabbit Management Console 来观察交换和事件消息。要登录,用户名为 guest
,密码也为 guest
。要观察实际的消息内容,您可能需要手动创建一个队列,并使用 #
作为您的 路由键
将其绑定到所需的主题。
要了解 Spring Cloud Stream 的最新信息,请访问 Spring 网站上项目的专用 项目页面。
要从头开始创建自己的 Spring 项目,请使用 start.spring.io 上的项目配置器。
如果您想深入了解 Spring 和纯 Kafka,请查看以下优秀的博文