案例研究:使用 CDC Debezium 源和 Analytics 接收器进行实时变更数据捕获 (CDC) 分析

工程 | Christian Tzolov | 2020年12月14日 | ...

本文是博客系列的一部分,该系列探讨了基于 Java 函数的全新设计的 Spring Cloud Stream 应用程序。

以下是此博客系列的所有先前部分。

在这篇文章中,我们将介绍 Debezium CDC 源,它允许我们捕获来自 MySQL、PostgreSQL、MongoDB、Oracle、DB2 和 SQL Server 等数据库的数据库更改,并通过各种消息绑定器(例如 RabbitMQ、Apache Kafka、Azure Event Hubs、Google PubSub 和 Solace PubSub+ 等)实时处理这些更改。

此外,我们将揭示如何使用 Analytics 接收器 将捕获的数据库更改转换为指标,并将其发布到各种监控系统以进行进一步分析。

本文首先解释 CDC 供应商Analytics 消费者 组件,展示如何在您自己的 Spring 应用程序中以编程方式自定义和使用它们。接下来,我们解释 CDC 源Analytics 接收器 如何构建在供应商和消费者之上,以提供开箱即用、随时可用的流应用程序。

最后,我们将演示使用 Spring Cloud Data Flow (SCDF) 部署对数据库更新实时做出反应的流管道、将更改事件转换为分析指标并将其发布到 Prometheus 以供使用 Grafana 进行分析和可视化的简易性。

变更数据捕获

变更数据捕获 (CDC) 是一种用于观察写入数据库的所有数据更改并将其作为事件发布的技术,这些事件可以以流式方式进行处理。由于您的应用程序数据库始终处于更改状态,因此 CDC 允许您对这些更改做出反应,并允许您的应用程序按与提交到数据库相同的顺序流式传输每个行级更改。

CDC 支持多种用例,例如:缓存失效、内存中数据视图、更新搜索索引、通过保持不同数据源同步来复制数据、实时欺诈检测、存储审计跟踪、数据来源等等。

Spring Cloud Data Flow CDC 源 应用程序构建在 Debezium 之上,这是一个流行的开源基于日志的 CDC 实现,支持各种数据库。CDC 源支持各种消息绑定器,包括 Apache Kafka、Rabbit MQ、Azure Event Hubs、Google PubSub、Solace PubSub+。

注意

CDC 源 实现嵌入 Debezium 引擎,并且不依赖于 Apache Kafka 或 ZooKeeper!您可以将 CDC 源 与任何支持的消息绑定器一起使用!但是,Debezium 引擎 有一些需要考虑的 限制

CDC Debezium 供应商

CDC Debezium 供应商java.util.function.Supplier bean 的形式实现,当调用该 bean 时,它将提供给定目录中文件的內容。文件供应商具有以下签名

Supplier<Flux<Message<?>>>

供应商的用户可以订阅返回的 Flux<Message<?>,这是一个消息流或 CDC 更改事件,它具有复杂的结构。每个事件都由三个部分组成(例如 metadatabeforeafter),如以下有效负载示例所示

{
  "before": { ... },  // row data before the change.
  "after": { ... },  // row data after the change.
  "source": {  //  the names of the database and table where the change was made.
    "connector": "mysql", "server_id": 223344,"snapshot": "false",
    "name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
    "db": "inventory",  // source database name.
    "table": "customers", // source table name.
  },
  "op": "u",  // operation that made the change.
  "ts_ms": 1607440256301, // timestamp - when the change was made.
  "transaction": null  // transaction information (optional).
}

如果将 cdc.flattening.enabled 属性设置为 true,则仅将 after 部分作为独立消息传递。

为了调用 CDC 供应商,我们需要指定一个源数据库以从中接收 CDC 事件。cdc.connector 属性用于在支持的 mysqlpostgressql serverdb2oraclecassandramongo 源数据库类型之间进行选择。cdc.config.database.* 属性有助于配置源访问。以下是连接到 MySQL 数据库的示例配置

# DB type
cdc.connector=mysql

# DB access
cdc.config.database.user=debezium
cdc.config.database.password=dbz
cdc.config.database.hostname=localhost
cdc.config.database.port=3306

# DB source metadata
cdc.name=my-sql-connector
cdc.config.database.server.id=85744
cdc.config.database.server.name=my-app-connector

cdc.name, cdc.config.database.server.idcdc.config.database.server.name 属性用于识别和调度传入事件。您可以选择设置 cdc.flattening.enabled=true 将 CDC 事件展平,以仅用其 after 字段替换原始更改事件以创建简单的 Kafka 记录。可以选择使用 cdc.schema=true 将 DB 架构包含到 CDC 事件中。

注意

必须将源数据库配置为公开其 预写日志 API,以便 Debezium 能够连接和使用 CDC 事件。 Debezium 连接器文档 提供了有关如何为任何支持的数据库启用 CDC 的详细说明。出于我们此处演示的目的,我们将使用预配置的 MySQL docker 镜像

在自定义应用程序中重用 CDC 供应商

CDC 供应商是一个可重用的 Spring bean,我们可以在最终用户自定义应用程序中注入它。注入后,可以直接调用它并将其与数据的自定义处理结合起来。这是一个示例。

@Autowired
Supplier<Flux<Message<?>>> cdcSupplier;

public void consumeDataAndSendEmail() {
  Flux<Message<?> cdcData = cdcSupplier.get();
  messageFlux.subscribe(t -> {
      if (t == something)
         //send the email here.
      }
  }
}

在上面的伪代码中,我们注入 CDC 供应商 bean,然后使用它来调用其 get() 方法以获取 Flux。然后我们订阅该 Flux,并且每次我们通过 Flux 接收任何数据时,都应用一些过滤,并根据该数据采取措施。这只是一个简单的说明,展示了我们如何重用 CDC 供应商。当您在实际应用程序中尝试此操作时,您可能需要在您的实现中进行更多调整,例如在执行条件检查之前将接收到的数据的默认数据类型从 byte[] 转换为其他内容。

提示

为了构建独立的非流式应用程序,您可以利用 cdc-debezium-boot-starter。只需添加 cdc-debezium-boot-starter 依赖项并实现您的自定义 Consumer<SourceRecord> 处理程序以处理传入的数据库更改事件。

CDC Debezium 源

正如我们在这个博客系列中所看到的,所有开箱即用的 Spring Cloud Stream 源应用程序都已使用几个开箱即用的通用处理器自动配置。您可以将这些处理器作为 CDC 源 的一部分激活。这是一个示例,我们运行 CDC 源并接收数据,然后转换消耗的数据,然后再将其发送到中间件上的目标。

java -jar cdc-debezium-source.jar
 --cdc.connector=mysql --cdc.name=my-sql-connector
 --cdc.config.database.server.name=my-app-connector
 --cdc.config.database.user=debezium --cdc.config.database.password=dbz
 --cdc.config.database.hostname=localhost --cdc.config.database.port=3306
 --cdc.schema=true
 --cdc.flattening.enabled=true
 --spring.cloud.function.definition=cdcSupplier|spelFunction
 --spel.function.expression=payload.toUpperCase()

通过为 spring.cloud.function.definition 属性提供 cdcSupplier|spelFunction 值,我们激活了与 CDC 供应器组合的 SpEL 函数。然后,我们提供一个我们想要用于转换数据的 SpEL 表达式,使用 spel.function.expression 。还有其他几个函数可以以这种方式组合。请查看此处以获取更多详细信息。

分析消费者

分析消费者提供了一个函数,该函数根据输入数据消息计算分析结果并将它们作为指标发布到各种监控系统。它利用Micrometer 库为最流行的监控系统提供统一的编程体验,并使用Spring 表达式语言 (SpEL)来定义如何从输入数据计算指标名称、值和标签。

我们可以在自定义应用程序中直接使用消费者 Bean 来计算来自传递消息的分析结果。以下是分析消费者 Bean 的类型签名

Consumer<Message<?>> analyticsConsumer

注入到自定义应用程序后,用户可以直接调用消费者的 accept() 方法并提供一个 Message<?> 对象来计算并将分析结果发布到后端监控系统。

消息是数据的通用容器。每个消息实例都包含一个 payload 和 headers,其中包含作为键值对的用户可扩展属性。SpEL 表达式用于访问消息的标头和有效负载以计算指标数量和标签。例如,计数器指标可以具有从输入消息有效负载的大小计算得出的值 amount,并添加一个从 kind 标头值提取的 my_tag 标签。

analytics.amount-expression=payload.lenght()
analytics.tag.expression.my_tag=headers['kind']

分析消费者配置属性以 analytics.* 前缀开头。请查阅AnalyticsConsumerProperties以获取可用的分析属性。监控配置属性以 management.metrics.export 前缀开头。有关配置特定监控系统的信息,请遵循提供的配置说明

分析接收器

与 CDC 源 的情况一样,Spring Cloud Stream 开箱即用的应用程序已经提供了基于 分析消费者 的分析接收器

接收器适用于Apache KafkaRabbitMQ绑定器变体。当用作 Spring Cloud Stream 接收器时,分析消费者会自动配置为接受来自相应中间件系统的数据,例如,来自 Kafka 主题或 RabbitMQ 交换机的数据。

在 Spring Cloud Data Flow 上运行

单独运行 CDC 源 和 分析接收器 都没问题,但是 Spring Cloud Data Flow 使它们作为管道运行变得非常容易。基本上,我们希望编排如下所示的数据流

scdf pipelines

cdc-log 管道部署一个 cdc-source,该 cdc-source 使用 JSON 消息格式将所有数据库更改流式传输到 log-sink。同时,cdc-analytics-tap 管道连接 cdc-source 输出到 analytics-sink,以根据 CDC 事件计算数据库统计信息并将它们发布到时间序列数据库 (TSDB),例如 Prometheus 或 Wavefront。Grafana 仪表板用于可视化这些更改。

Spring Cloud Data Flow 的安装说明说明了如何在任何受支持的云平台上安装 Spring Cloud Data Flow。

下面,我们将简要提供设置 Spring Cloud Data Flow 的步骤。首先,我们需要获取用于运行 Spring Cloud Data Flow 的 docker-compose 文件,Prometheus 和 Grafana

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose.yml

wget -O docker-compose-prometheus.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose-prometheus.yml

此外,获取此额外的 docker-compose 文件以安装一个源 MySQL 数据库,该数据库配置为公开其 CDC Debezium 连接到的预写日志。

wget -O mysql-cdc.yml https://gist.githubusercontent.com/tzolov/48dec8c0db44e8086916129201cc2c8c/raw/26e1bf435d58e25ff836e415dae308edeeef2784/mysql-cdc.yml

mysql-cdc 使用debezium/example-mysql镜像,并附带清单、示例数据库

invetory db

我们需要设置一些环境变量才能正确运行 Spring Cloud Data Flow。

export DATAFLOW_VERSION=2.7.1
export SKIPPER_VERSION=2.6.1
export STREAM_APPS_URI=https://dataflow.springjava.cn/kafka-maven-latest

现在我们已经准备就绪,是时候开始运行 Spring Cloud Data Flow 和所有其他辅助组件了。

docker-compose -f docker-compose.yml -f docker-compose-prometheus.yml -f mysql-cdc.yml up

提示

要使用 RabbitMQ 而不是 Apache Kafka,您可以下载一个额外的 docker-compose 文件,如RabbitMQ 而不是 Kafka说明中所述,并将 STREAM_APPS_URI 变量设置为https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-rabbit-maven代替。

提示

要使用 Wavefront 而不是 Prometheus 和 Grafana,请遵循Wavefront说明。

SCDF 启动并运行后,转到https://127.0.0.1:9393/dashboard。然后转到左侧的 Streams,然后选择 Create Stream。从源应用程序中选择 cdc-debezium,从接收器应用程序中选择 log 和 analytics,以定义 cdc-log = cdc-debezium | log 和 cdc-analytic-tap = :cdc-log.cdc-debezium > analytics 管道。您可以单击应用程序选项以选择所需的属性。

为了更快地引导,您可以复制/粘贴以下可立即使用的管道定义代码段

cdc-log = cdc-debezium --cdc.name=mycdc --cdc.flattening.enabled=false --cdc.connector=mysql --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.dbname=inventory --cdc.config.database.hostname=mysql-cdc --cdc.config.database.port=3307 --cdc.stream.header.offset=true --cdc.config.database.server.name=my-app-connector --cdc.config.tombstones.on.delete=false | log

cdc-analytic-tap = :cdc-log.cdc-debezium > analytics --analytics.name=cdc --analytics.tag.expression.table=#jsonPath(payload,'$..table') --analytics.tag.expression.operation=#jsonPath(payload,'$..op') --analytics.tag.expression.db=#jsonPath(payload,'$..db')

cdc-log 管道部署一个 cdc-debezium 源,该源连接到 mysql-cdc:3307 处的 MySQL 数据库并将数据库更改事件流式传输到 log 接收器。请查阅cdc-debezium 文档以获取可用的配置选项。

cdc-analytic-tap 管道连接到 cdc-debezium 源 的输出并将 CDC 事件流式传输到 分析接收器。分析创建一个指标计数器(称为 cdc),并使用SpEL 表达式根据流式传输的消息有效负载计算指标标签(例如数据库、表和操作)。

例如,让我们修改 MySQL inventory 数据库中的 customers 表。更新事务作为更改事件发送到 cdc-debezium 源,该源将本机数据库事件转换为如下所示的统一消息有效负载

{
  "before": {
    "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "[email protected]"
  },
  "after": {
    "id": 1004, "first_name": "Anne2", "last_name": "Kretchmar", "email": "[email protected]"
  },
  "source": {
    "version": "1.3.1.Final", "connector": "mysql", "server_id": 223344, "thread": 5,
    "name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
    "db": "inventory",
    "table": "customers",
  },
  "op": "u",
  "ts_ms": 1607440256301,
  "transaction": null
}

以下 SpEL 表达式用于根据 CDC 消息有效负载计算 3 个标签(dbtableoperation)。这些标签分配给发布到 Prometheus 的每个 cdc 指标。

--analytics.tag.expression.db=#jsonPath(payload,'$..db')
--analytics.tag.expression.table=#jsonPath(payload,'$..table')
--analytics.tag.expression.operation=#jsonPath(payload,'$..op')

以下是选择所有属性后它应该是什么样子的屏幕截图

scdf create streams

在文档中创建部署cdc-logcdc-analytics-tap管道,接受所有默认选项。或者,您可以使用“组操作”同时部署两个流。

部署流后,您可以通过 SCDF UI 或使用 Skipper docker 容器查看已部署应用程序的日志,如文档中所述。如果您检查日志接收器应用程序的日志,您应该会看到类似于以下内容的 CDC JSON 消息

cdc event log

接下来,使用按钮(或仅打开localhost:3000)转到 Grafana 仪表板,并以用户:`admin`和密码:`admin`登录。您可以浏览“应用程序”仪表板以检查已部署管道的性能。现在,您可以导入CDC Grafana 仪表板-Prometheus.json仪表板,并查看类似于以下内容的仪表板

grafana dashboard

以下查询已用于聚合 Prometheus 中的 cdc_total 指标

sort_desc(topk(10, sum(cdc_total) by (db, table)))
sort_desc(topk(100, sum(cdc_total) by (op)))

提示

您可以打开https://127.0.0.1:9090处的 Prometheus UI 以检查配置以及运行一些临时 PQL 查询。

生成数据库活动

您可以连接到 localhost:3307 (用户:root和密码:debezium)处的清单 CDC MySQL 数据库并开始修改数据。

以下 docker 命令显示了如何连接到 mysql-cdc

docker exec -it mysql-cdc  mysql -uroot -pdebezium --database=inventory

以下脚本有助于生成多个插入、更新和删除数据库事务

for i in {1..100}; do docker exec -it mysql-cdc  mysql -uroot -pdebezium --database=inventory -e'INSERT INTO customers (first_name, last_name, email) VALUES ("value1", "value2", "val@bla"); UPDATE customers SET first_name="value2" WHERE first_name="value1"; DELETE FROM customers where first_name="value2";'; done

您将看到 log-sink 日志反映这些更改以及 CDC 仪表板图表更新。

结论和未来工作

在本博文中,我们了解了 CDC-Debezium 供应器和分析消费者功能及其对应的 Spring Cloud Stream 源和接收器的运作方式。供应器和消费者功能可以注入到自定义应用程序中,以与其他业务逻辑相结合。

源和接收器应用程序开箱即用,可用于 Kafka 和 RabbitMQ 中间件变体。

您可以轻松构建独立应用程序,将 Cdc-Debezium 供应器与 Geode 消费者结合使用,以创建和维护数据库数据的内存中视图。类似地,您可以将 Cdc-Debezium 供应器与 Elasticsearch 消费者结合使用,以实时维护数据库数据的可搜索索引。

更令人兴奋的是,您可以使用开箱即用的 cdc-debezium 源geode 接收器elasticsearch 接收器 应用程序来实现上述场景。您可以在不同的消息绑定器和源数据库上构建这些管道。

Spring One 演示文稿 演示了一个高级用例,使用 CDC-Debezium 和机器学习构建用于信用卡欺诈检测的流数据管道。

cdc fraud detection

在本博文系列中,我们还有更多剧集即将推出。敬请关注。

获取 Spring 电子邮件简报

与 Spring 电子邮件简报保持联系

订阅

抢先一步

VMware 提供培训和认证,助您快速提升技能。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部