美妙的 GCP:与 Google Cloud Pub/Sub 集成 (4/8)

工程 | Josh Long | 2018 年 8 月 30 日 | ...

嗨,Spring 粉丝们!在这个简短的 8 部分系列文章中,我们将探讨用于 Google Cloud Platform 的 Spring Cloud 集成,称为 Spring Cloud GCP。Spring Cloud GCP 代表了 Google 和 Pivotal 之间的联合努力,旨在为在使用 Google Cloud Platform 时提供 Spring Cloud 开发人员一流的体验。Pivotal Cloud Foundry 用户将享受更加轻松与 GCP 服务代理集成。我编写了这些部分,并得到了 Google Cloud 开发者布道师兼我的朋友Ray Tsang的意见。您还可以在我们的 Google Next 2018 会议中观看 Spring Cloud GCP 的演练,Bootiful Google Cloud Platform。感谢我的朋友!一如既往,如果您有任何反馈,我很乐意听取

该系列共有八篇文章。它们都在这里

](https://springjava.cn/blog/2018/09/06/bootiful-gcp-supporting-observability-with-spring-cloud-gcp-stackdriver-trace-6-8)

  • [美妙的 GCP:使用 Spring Cloud GCP 连接到其他 GCP 服务 (7/8)

](https://springjava.cn/blog/2018/09/10/bootiful-gcp-use-spring-cloud-gcp-to-connect-to-other-gcp-services-7-8)

让我们看看与 Google Cloud Pub/Sub 的应用程序集成。Google Cloud Pub/Sub 支持 Google 规模下许多经典的企业应用程序集成用例。Pub/Sub 的 Google Cloud 网站 列出了一些

  • 在网络集群中平衡工作负载。例如,可以将大量任务队列有效地分布到多个工作程序中,例如 Google Compute Engine 实例。

  • 实现异步工作流。例如,订单处理应用程序可以在主题上放置订单,然后一个或多个工作程序可以处理该订单。

  • 分发事件通知。例如,接受用户注册的服务可以在每次新用户注册时发送通知,下游服务可以订阅以接收事件通知。

  • 刷新分布式缓存。例如,应用程序可以发布失效事件以更新已更改对象的 ID。

  • 记录到多个系统。例如,Google Compute Engine 实例可以将日志写入监控系统,写入数据库以供以后查询,等等。

  • 来自各种流程或设备的数据流。例如,住宅传感器可以将数据流传输到云中托管的后端服务器。

  • 提高可靠性。例如,单区域 Compute Engine 服务可以通过订阅公共主题在其他区域运行,以从区域或区域中的故障中恢复。

使用 Google Cloud Pub/Sub 的流程完全符合您的预期:消息发送到 Pub/Sub 代理(由 GCP 在云中托管)中的主题,然后代理为您持久保存该消息。订阅者可以通过(通过 Webhook)将消息推送到其自身,或者可以轮询代理以获取消息。订阅者从代理接收消息并确认每条消息。当订阅者确认一条消息时,该消息将从订阅者的订阅队列中删除。任何可以与 HTTPS 通信的客户端都可以使用此服务。不需要其他 API。

如果您曾经使用过任何其他消息传递系统(JMS、AMQP、Apache Kafka、Kestrel),则该域模型非常简单:主题是发布消息的对象。订阅表示要传递到特定客户端应用程序的来自特定主题的消息流。一个主题可以有多个订阅。一个订阅可以有多个订阅者。如果您想将不同的消息分发到不同的订阅者,那么所有订阅者都必须订阅相同的订阅。如果您想将相同的消息发布到所有订阅者,那么每个订阅者都需要订阅其自己的订阅。

Pub/Sub 传递至少一次。因此,如果您无法多次处理相同的消息,则必须处理幂等性和/或重复数据删除消息。

消息存储数据和(可选)属性的组合,这些数据和属性由 Google Cloud Pub/Sub 从发布者传送到订阅者。消息属性(您可能更了解为标头)是消息中的键值对。您可能有一个标头描述有效负载的语言。您可能有一个标头描述内容类型。

让我们将 Google Cloud Pub/Sub 添加到应用程序中并将它们绑定在一起。

和以前一样,我们需要启用 Google Cloud Pub/Sub API 以供使用。

gcloud services enable pubsub.googleapis.com

然后,您需要创建一个新主题,reservations

gcloud pubsub topics create reservations

该主题表示我们将发送消息的位置。我们仍然需要创建一个订阅来使用该主题中的消息。以下命令创建一个订阅,reservations-subscription,以连接到reservations主题。

gcloud pubsub subscriptions create reservations-subscription --topic=reservations

这些部分到位后,我们就可以从应用程序中使用它们了。将 Spring Cloud GCP Pub/Sub 启动器,org.springframework.cloudspring-cloud-gcp-starter-pubsub,添加到您的构建中。这引入了 Google Cloud PubSubTemplate 的自动配置。如果您曾经使用过JmsTemplateKafkaTemplate,那么PubSubTemplate应该很熟悉。它是一个易于使用的客户端,用于使用 Google Cloud Pub/Sub 生成和使用消息。如果您刚开始使用 GCP Pub/Sub 和一般消息传递,则 Spring 宇宙中的*Template对象是一个不错的起点。

让我们看一个简单的示例,该示例在您向 Spring Boot 应用程序中运行的 HTTP 端点发出 HTTP POST 调用时发布消息。然后,我们将设置一个订阅者来使用发送的消息。

package com.example.gcp.pubsub.template;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@RestController
class PublisherConfig {

        private final PubSubTemplate template;
        private final String topic;

        PublisherConfig(PubSubTemplate template, @Value("${reservations.topic:reservations}") String t) {
                this.template = template;
                this.topic = t;
        }

        
        @PostMapping("/publish/{name}")
        void publish(@PathVariable String name) {
                this.template.publish(this.topic, "Hello " + name + "!");
        }
}
  • 我们使用注入的PubSubTemplate将消息(一个字符串)发送到配置的主题。

现在,让我们看一个简单的应用程序,它可能很容易在另一个节点中运行,该节点使用与主题链接的订阅中的消息。

package com.example.gcp.pubsub.template;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;

@Slf4j
@Configuration
class SubscriberConfig {

        private final PubSubTemplate template;
        private final String subscription;

        SubscriberConfig(PubSubTemplate template, @Value("${reservations.subscription:reservations-subscription}") String s) {
                this.template = template;
                this.subscription = s;
        }

        @EventListener(ApplicationReadyEvent.class)
        public void start() {
                
                this.template.subscribe(this.subscription, (pubsubMessage, ackReply ) -> {
                        log.info("consumed new message: [" + pubsubMessage.getData().toStringUtf8() + "]");
                        ackReply.ack();
                });
        }
}
  • 应用程序启动并运行后,我们明确订阅,将我们的客户端连接到正确的端点。

此示例使用PubSubTemplate(非常有效)。它简单、简短且易懂。但是,随着集成的复杂性增加,将消息流中涉及的组件从一个系统解耦到另一个系统变得很有用。我们引入阶段——组件链中的链接——消息必须通过这些链接才能到达下游组件。此分段允许我们编写可以互换的处理代码,而无需考虑给定消息的来源或目标。这促进了测试,因为组件只需要根据其直接的前置条件和后置条件编写:组件可以声明它只接受 Spring Framework Message<File> 类型,而其他类型则不行。这种接口间接性非常方便,尤其是在我们开始将可能以不同节奏处理工作的现实世界系统绑定在一起时。引入代理以缓冲工作,然后再将其传递到可能发生瓶颈的下游组件变得微不足道。这种方法——隔离消息流中涉及的组件并引入缓冲区以保护下游组件——称为分阶段事件驱动架构(SEDA),并且随着世界转向微服务和高度分布式系统,它现在比以往任何时候都更有价值。

Spring Integration 是一个旨在促进这种间接性的框架。它以MessageChannel的概念为核心,您可以将其视为内存中的Queue;消息流经的管道。在MessageChannel的两侧都设置了组件。您可以想象一个组件输出特定类型的消息并将其发送到此MessageChannel,而不必关心它将去往何处。在另一端是另一个组件,它使用特定类型的消息,完全不知道任何给定消息的来源。今天可能有一个服务参与消息的生成。明天可能会有十个!上游和下游组件无需更改。这种间接性为我们提供了许多可能性。我们更改给定消息的路由,将其串联到不同的服务中,拆分它,聚合它,等等。我们可以转换其他数据源并将它们适配到上游的消息流(这称为入站适配器)。我们可以为数据引入新的接收器,将 Spring Framework Message<T> 适配为正确的类型(这称为出站适配器)。

让我们来看看 Spring Integration 和 Google Cloud Pub/Sub 的入站和出站适配器。我们将保持与之前相同的方法:一个 HTTP 端点将发布消息,然后这些消息将传递到 Google Cloud Pub/Sub。代码可以在不同的节点中运行。您还需要类路径上的 Spring Integration 类型才能使此示例正常工作。将org.springframework.boot:spring-boot-starter-integration添加到构建中。

让我们看看一个发布者,它在每次进行 HTTP POST 时发布消息。在这种情况下,发布者将请求发送到一个MessageChannel,然后将其传递到一个PubSubMessageHandler。今天它直接发送到 Pub/Sub,但明天它可以发送到数据库、FTP 服务器、XMPP、Salesforce 或任何其他东西,然后再发送到 Pub/Sub。

package com.example.gcp.pubsub.integration;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@Configuration
class PublisherConfig {

        private final String topic;
        private final PubSubTemplate template;

        public PublisherConfig(
            @Value("${reservations.topic:reservations}") String t,
            PubSubTemplate template) {
                this.topic = t;
                this.template = template;
        }

        @Bean
        IntegrationFlow publisherFlow() {
                return IntegrationFlows
                    .from(this.outgoing()) 
                    .handle(this.pubSubMessageHandler()) 
                    .get();
        }

        @PostMapping("/publish/{name}")
        void publish(@PathVariable String name) {
                
                outgoing().send(MessageBuilder.withPayload(name).build());
        }

        @Bean
        SubscribableChannel outgoing() {
                return MessageChannels.direct().get();
        }

        @Bean
        PubSubMessageHandler pubSubMessageHandler() {
                return new PubSubMessageHandler(template, this.topic);
        }
}
  • IntegrationFlow描述了集成中消息的。发送到outgoing MessageChannel的消息将传递到PubSubMessageHandler,然后使用指定的topic将其写入 Google Cloud Pub/Sub。

  • 在 Spring MVC HTTP 端点中,我们获取对MessageChannel的引用,并将一条消息(我们使用MessageBuilder构建)发布到其中。注意:像我在此示例中那样调用outgoing()是可以的,因为 Spring 会记住方法调用的结果;我将始终获取MessageChannel bean 的相同预实例化的单例。

在消费者端,我们反向执行相同的操作,适配传入的消息,然后在IntegrationFlow中记录它们。

package com.example.gcp.pubsub.integration;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;

@Slf4j
@Configuration
class SubscriberConfig {

        private final String subscription;
        private final PubSubTemplate template;

        SubscriberConfig(
            @Value("${reservations.subscription:reservations-subscription}") String s,
            PubSubTemplate t) {
                this.subscription = s;
                this.template = t;
        }

        @Bean 
        public PubSubInboundChannelAdapter messageChannelAdapter() {
                PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(
                    template, this.subscription);
                adapter.setOutputChannel(this.incoming());
                adapter.setAckMode(AckMode.MANUAL);
                return adapter;
        }

        @Bean
        MessageChannel incoming() {
                return MessageChannels.publishSubscribe().get();
        }

        @Bean
        IntegrationFlow subscriberFlow() {
                return IntegrationFlows
                    .from(this.incoming()) 
                    .handle(message -> { 
                            log.info("consumed new message: [" + message.getPayload() + "]");
                            AckReplyConsumer consumer = message.getHeaders()
                                .get(GcpPubSubHeaders.ACKNOWLEDGEMENT, AckReplyConsumer.class);
                            consumer.ack();
                    })
                    .get();
        }
}
  • PubSubInboundChannelAdapter适配来自订阅的消息,并将它们发送到incoming MessageChannel

  • IntegrationFlow接收传入的消息,并将它们路由到一个MessageHandler(我们使用 lambda 语法贡献了它),该处理程序 a) 记录传入的消息,以及 b) 手动确认消息的接收。

在这两个示例中,IntegrationFlow的优点在于您可以将调用链接在一起。在这里,我们只指定消息的来源(.from())和处理它的对象(.handle()),但我们也可以在.handle()调用之后轻松地路由、拆分、转换等消息。作为某个组件(适配器、消息处理程序、转换器等)输出发送的消息将成为任何下游组件的输入。

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部