使用 Spring Cloud Stream 构建简单的事件驱动微服务

工程 | Ben Wilcock | 2019 年 10 月 15 日 | ...

事件驱动架构很棒。但是,如果没有框架,编写与流行的事件消息平台协作所需的脚手架可能会很混乱。在这篇文章中,我们将看看如何使用 Spring Cloud Stream 来简化你的代码。

问题

你只想为你的事件驱动应用程序编写业务逻辑,但那些样板式的消息代码可能会碍事。将你的应用程序连接到消息服务很棘手,如果你是一名企业开发者,你可能需要使用多种消息技术(无论是在本地还是在云端)。

解决方案

让灵活的消息抽象来处理复杂的消息平台集成,这样你就可以专注于编写简单清晰的业务逻辑。Spring Cloud Stream 是一个很好的选择。它将许多流行的消息平台统一在一个易于使用的 API 之后,包括 RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs 和 Apache RocketMQ。它甚至消除了这些平台之间在方法和特性上的任何细微差异(例如分区或交换机),让你自由地创建创新的事件驱动解决方案。

在接下来的演示中,你将看到 Spring Cloud Stream 巧妙的抽象如何帮助使事件流代码更清晰、更容易使用。你还将看到使用 Spring Cloud Stream 的 binding 库在两种不同的消息平台(RabbitMQKafka)之间切换是多么容易。

开始之前

这些事件驱动的微服务需要在你的电脑上安装最新版本的以下应用程序1

  1. Java 8
  2. Docker(我们将在本地运行 RabbitMQ 和 Kafka)
  3. Git(可选)
  4. Bash(假设,虽然其他替代方案也可能奏效)

运行演示

首先,从 GitHub 克隆代码仓库。要执行此操作(如果你安装了 Git),请打开新的终端窗口并执行以下命令。如果你没有安装 Git,请下载并解压 此 zip 文件

git clone https://github.com/benwilcock/spring-cloud-stream-demo.git

检查代码后,你会注意到这个仓库包含两个微服务。

  1. Loansource 微服务(位于 /loansource 文件夹中)。此微服务充当事件消息的来源。这些事件是 Loan 应用程序,类似于你在银行和金融领域中会看到的。每笔贷款都有“名称”、“金额”和“状态”(最初设置为 PENDING,即待处理)。

  2. Loancheck 微服务(位于 /loancheck 文件夹中)。此微服务充当 Loan 处理器。它检查哪些贷款是适合发放的,并将它们分类到 APPROVED(已批准)或 DECLINED(已拒绝)状态。

要运行演示,请按照以下说明操作。

步骤 1:启动消息服务器

在一个新的终端窗口中,进入项目的根文件夹并执行以下命令。

你需要安装并运行 "Docker",以便此脚本正常工作,因为它需要 docker-compose

./start-servers.sh

此脚本将启动 KafkaRabbitMQ,并将两者的日志输出流式传输到终端窗口(除非你按 Ctrl-C 退出)。当你按 Ctrl-C 时,服务器不会停止 - 它们会继续在后台运行。启动后,这些服务器都将可用于在你的计算机上运行的应用程序。

步骤 2:选择 Kafka 或 RabbitMQ 模式

在接下来的步骤 3 和 4 中,我们必须将 -P<profile-choice> 替换为我们想要使用的消息平台的名称。

  • 对于 Kafka,使用:-Pkafka
  • 对于 RabbitMQ,使用:-Prabbit

如果你完全省略 -P<profile-choice> 设置,则默认使用 Kafka。

注意:此演示并非旨在在 Kafka 和 RabbitMQ 之间“桥接”消息,因此在编译和运行这两个应用程序时,请确保在每个应用程序中选择相同的配置文件名。如果你的目标是桥接消息系统,请参阅此处的文档

步骤 3:生成一些贷款事件

在一个新的终端窗口中,使用 cd 命令将当前目录切换到 /loansource 目录,然后执行以下命令,将 <profile-choice> 替换为你想要运行的模式(如上面步骤 2 中讨论的 kafkarabbit 模式)。

./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>

一旦 loansource 应用程序启动,在终端窗口中,你应该每秒看到一条消息,告诉你一个新的 Loan 事件已以 PENDING(待处理)状态发布到消息平台。让此微服务保持运行,然后进入下一步。

步骤 4:处理贷款事件

在另一个新的终端窗口中,将当前目录切换到 /loancheck 目录,然后执行以下命令,同样将 <profile-choice> 替换为你想要运行的模式。

./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>

一旦 loancheck 应用程序启动,在终端窗口中,你应该每秒看到一条消息,告诉你已从消息平台读取了一个新的 PENDING(待处理)Loan 应用程序,并且它被 APPROVED(已批准)或 DECLINED(已拒绝)。如果你想了解这些应用程序是如何构建的,请跳到“工作原理”部分。

步骤 5:停止演示

完成微服务后,在 /loansource/loancheck 微服务的每个终端窗口中按 Ctrl-C。应用程序将停止,事件处理也将停止。

如果你要在 Kafka 和 Rabbit 之间切换模式,只需返回到步骤 2 并重复该过程即可。

如果你已完全完成演示,并希望停止 Kafka 和 RabbitMQ 服务器,请在项目根文件夹的终端窗口中运行 ./stop-servers.sh 脚本。如果你只是在模式之间切换,则无需执行此操作。

工作原理

Maven Profiles(在每个项目的 pom.xml 中)控制你在构建时将哪些 Spring Cloud Stream 绑定器添加为依赖项。当你选择 -Pkafka 时,[spring-cloud-stream-binder-kafka][kafka] 依赖项会添加到项目中。当你选择 -Prabbit 时,[spring-cloud-stream-binder-rabbit][rabbit] 依赖项会添加进来。

<profiles>
    <profile>
        <id>kafka</id>
        <properties>
            <spring.profile.activated>kafka</spring.profile.activated>
        </properties>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                <version>${spring-cloud-stream.version}</version>
            </dependency>
        </dependencies>
    </profile>
    ...
<profiles>

你选择的 Maven Profile 也会影响 src/main/resources/application.properties 文件中的 spring.profiles.active 属性,该属性会切换你在启动时看到的横幅。

Loansource 微服务

对于 Loansource 微服务,我们使用了 Spring Cloud Stream v2.1 的一个新特性 - Spring Cloud Function 支持。借助这个新特性,让 LoansourceApplication 微服务充当 Loan 消息源所需的只是声明一个生成并返回 Supplier<>@Bean 方法。在这种情况下,它是一个类型为 LoanSupplier。函数方法代码如下所示...

@Bean
public Supplier<Loan> supplyLoan() {
  return () -> {
    Loan loan = new Loan(UUID.randomUUID().toString(), "Ben", 10000L);
    LOG.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
    return loan;
  };
}

Supplier<> 是一种 Java 函数数据类型。由于只有一个返回此类型的 @Bean 方法,Spring Cloud Stream 知道下一步该怎么做。默认情况下,它会每秒触发一次此函数,并将结果发送到名为“output”的默认 MessageChannel。这个函数方法的优点在于它只包含业务逻辑,因此你可以使用常规的单元测试来测试它。

我们可以使用 application.properties 文件中的 spring.cloud.function.definition 属性来显式声明我们希望绑定到绑定目标(binding destinations)的函数 bean - 但对于只定义了一个 @Bean 的情况,这并非必要。

如果我们想使用不同的轮询间隔,可以使用 application.properties 文件中的 spring.integration.poller.fixed-delay 属性。

Loancheck 微服务

loancheck 微服务需要多一点代码,但不多。它的工作是将 Loan 事件分类到不同的通道。为此,它订阅来自源的 output 主题的事件,然后根据贷款的价值将它们发送到 approveddeclined 主题,类似于欺诈检查功能。

因为我们使用了 3 个消息通道(一个入站,两个出站),所以使用一个简单的 LoanProcessor 接口来明确输入和输出。目前,它看起来像这样

@Component
public interface LoanProcessor {

  String APPLICATIONS_IN = "output"; // Topic where the new loans appear
  String APPROVED_OUT = "approved"; // Topic where the approved loans are sent
  String DECLINED_OUT = "declined"; // Topic where the declined loans are sent

  @Input(APPLICATIONS_IN)
  SubscribableChannel sourceOfLoanApplications();

  @Output(APPROVED_OUT)
  MessageChannel approved();

  @Output(DECLINED_OUT)
  MessageChannel declined();
}

这个 LoanProcessor 接口首先在 @SpringBootApplication 类(LoanCheckApplication.java)中作为 @EnableBinding() 注解的参数被引用,如下所示。

@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {

  public static void main(String[] args) {
    SpringApplication.run(LoanCheckApplication.class, args);
  }
}

此外,一个名为 LoanChecker.java 的 Spring @Component 在运行时用这个 LoanProcessor 构建。而且,只要有新的 Loan 事件到达,这个组件的 checkAndSortLoans(Loan) 方法就会自动调用,因为它被标记为 LoanProcessor.APPLICATIONS_IN 通道的 @StreamListener()。你可以在以下代码示例中看到这个注解的使用。

  @StreamListener(LoanProcessor.APPLICATIONS_IN)
  public void checkAndSortLoans(Loan loan) {

    if (loan.getAmount() > MAX_AMOUNT) {
      loan.setStatus(Statuses.DECLINED.name());
      processor.declined().send(message(loan));
    } else {
      loan.setStatus(Statuses.APPROVED.name());
      processor.approved().send(message(loan));
    }
  }

然后,这个方法使用简单的业务逻辑对 Loan 对象进行分类。根据分类结果,它将它们发送到 processor.approved() 通道或 processor.declined() 通道(相应地设置其贷款状态之后)。

总结

正如你所看到的,使用 Spring Cloud Stream 时获得关注点分离确实非常有利。这两个微服务中完全没有任何 Kafka 或 RabbitMQ 特定的代码。这使我们可以专注于业务逻辑,而无需关心消息平台,你只需更改项目 pom.xml 中的“binder”依赖项,就可以轻松切换消息平台。

更多内容...

你可以按如下方式查看事件流经消息平台的情况

  • 对于 Kafka,可以使用 KafDrop 工具在 localhost:9000 上观察主题和事件消息。无需登录。

  • 对于 RabbitMQ,可以在 localhost:15672 上找到 Rabbit Management Console,用于观察交换机和事件消息。登录时用户名为 guest,密码也为 guest。要查看实际的消息内容,你可能需要手动创建一个队列,并使用 # 作为你的 routing key 将其绑定到所需的主题。

要了解 Spring Cloud Stream 的最新信息,请访问 Spring 网站上该项目的专属项目页面

要从头开始创建自己的 Spring 项目,请使用 start.spring.io 上的项目配置器。

如果你想深入了解 Spring 和纯粹的 Kafka,请查看这些精彩的博客文章

  1. Gary Russell:深入探讨 Spring for Apache Kafka:错误处理、消息转换和事务支持

  2. Soby Chacko:深入探讨 Spring for Apache Kafka:Apache Kafka 和 Spring Cloud Stream


脚注


  1. 此仓库中的微服务代码使用 MavenSpring BootSpring Cloud Stream 编写和打包。运行时,代码依赖于 KafkaZookeeperRabbitMQ 以及 KafDrop(一个由 Obsidian Dynamics 提供的 Docker 镜像)。此列表中的所有内容均已为你提供 - 你无需安装它们。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯以保持联系

订阅

领先一步

VMware 提供培训和认证,助你加速前进。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持及二进制文件。

了解更多

即将到来的活动

查看 Spring 社区的所有即将到来的活动。

查看全部