Spring Cloud Data Flow 的 1 个流、2 个应用程序和 3 个依赖项

工程 | Josh Long | April 05, 2016 | ...

我只是想在这里记录下昨天让我会心一笑的一个经历:让快速改进的 Spring Cloud Data Flow 在几分钟内从 (Spring Boot) 启动器 (start(-ers)) 摇摆(wiggle)到服务 (service)!

唯一的先决条件是你有一个正在运行的 Redis 实例。我的 Redis 实例运行在 127.0.0.1 上,Spring Boot 无需额外配置即可找到并使用它。

我们将使用超赞的 Spring Initializr 轻松生成我们的应用程序。还记得那些愚蠢的 Apple 广告,“There's an App For That?” 别管那个了,现在是 勾选框 就行!看看你是否像我一样喜欢这个体验!

本地 Data Flow 服务器

前往 Spring Initializr 选择 Local Data Flow Server 并将 artifact 命名为 df-server。这将用于启动一个本地 Data Flow 服务 - 一个 REST API 和一些持久化逻辑 - 用于编排和存储关于流和任务的信息。在旧的 Spring XD 世界中,这被称为 Spring XD 的 Admin Server

在你选择的 IDE 中打开项目,并将 @EnableDataFlowServer 添加到 DfServerApplication 类中

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;

@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DfServerApplication.class, args);
	}
}

df-server 项目的根目录下运行 mvn spring-boot:run,应用程序将在端口 9393 上启动。

提示:当你看到欢迎 ASCII 艺术字时,你就(很可能)成功了!

关于提示的提示:好吧,这可能不完全正确。它可能会因为各种原因失败(比如服务或嵌入式 H2 数据库的端口冲突),但高质量的 ASCII 艺术字已被证明在(我的)研究中具有治疗作用……(研究对象是……我自己)。

Data Flow Shell

前往 Spring Initializr 选择 Data Flow Shell 并将 artifact 命名为 df-shell。这将用于启动一个由 Spring Shell 驱动的 Data Flow shell。

Data Flow shell 可以在任何操作系统上运行。它是我们刚刚启动的 Data Flow 服务的客户端。它允许我们使用熟悉的管道与过滤器 DSL 和命令来操作服务。我和其他开发者一样喜欢精美的横幅 ASCII 艺术字,但美好的事物也可能(呃!)过犹不及。默认情况下,Spring Shell Spring Boot 都试图发出 ASCII 横幅,所以这次我们将让 Spring Boot 避开(这次!)。在你选择的 IDE 中打开项目,并将 @EnableDataFlowShell 添加到 DfShellApplication 类中,然后配置 SpringApplication 的创建方式以隐藏 Spring Boot 横幅。

package com.example;

import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;

@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(DfShellApplication.class)
				.bannerMode(Banner.Mode.OFF)
				.run(args);
	}
}

df-shell 项目的根目录下运行 mvn spring-boot:run。默认情况下,你应该能够与在本地运行的 Data Flow 服务器进行交互。尝试执行 module list 命令。你应该会看到一个表格,其中列出了 Spring Cloud Data Flow 已知的所有内置组件。

日志 Sink 模块

前往 Spring Initializr 选择 Stream Redis 并将 artifact 命名为 logging-sink。我们将使用 Spring Cloud Stream 构建一个记录传入消息的*自定义*模块。Spring Cloud Stream 构建于 Spring 的 MessageChannel 抽象和 Spring Integration 中的组件模型之上,可简化描述和集成基于消息的微服务的工作。然后,我们将使用 Spring Cloud Data Flow 来部署和编排这个模块。

Spring Cloud Data Flow 是一种强大的方式,可以用小的 Spring Boot 驱动的模块来描述复杂的集成、批处理和流处理工作负载。有几种类型的 module(模块)。一个 source(源)生成数据,通常按固定计划生成,下游组件可以消费和处理这些数据。一个 processor(处理器)接收数据,对其进行处理,然后输出数据。一个 sink(汇)只接收数据,但不产生任何要发送出去的东西。这些组件可以很好地组合在一起,描述任何潜在的连续工作负载(物联网传感器数据、24/7 事件处理、在线事务数据摄取和集成场景等)。最终,source 通常是 Spring Integration 的入站适配器。processor 通常是任何接收数据并输出数据的 Spring Integration 组件(如 transformer)。sink 通常是 Spring Integration 的出站适配器。

一个 task(任务)描述任何最终会停止的工作负载。它可能是一个简单的 Spring Boot Command Line Runner 或一个 Spring Batch Job

尽管如此,Spring Cloud Data Flow 并没有特定的 Spring Integration 知识。它只了解 Spring Cloud Stream 以及众所周知的 Spring MessageChannels(消息通道),例如 inputoutput。它不关心这些通道的终端是什么。Spring Cloud Data Flow 也没有特定的 Spring Batch 知识。它只了解 Spring Cloud Task。

正如 UNIX sh shell 环境允许我们通过将数据分别传递到 stdin 和从 stdout 传递数据,从而从单一功能的命令行工具组合出任意多且任意复杂的解决方案一样,Spring Cloud Data Flow 也允许我们从单一功能的消息传递组件组合出任意多且任意复杂的解决方案。

Spring Cloud Data Flow 已经内置了许多开箱即用的功能。我们将开发并安装一个简单的模块来记录东西 - 在我们的例子中,是时间。值得注意的是,我们这样做是为了自己的学习,但实际上我们并*不需要*这样做;Spring Cloud Data Flow 已经提供了 log 模块!(还有一个 time 模块!)

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.Map;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

	@MessageEndpoint
	public static class LoggingMessageEndpoint {

		@ServiceActivator(inputChannel = Sink.INPUT)
		public void logIncomingMessages(
				@Payload String msg,
				@Headers Map<String, Object> headers) {

			System.out.println(msg);
			headers.entrySet().forEach(e ->
					System.out.println(e.getKey() + '=' + e.getValue()));

		}
	}

	public static void main(String[] args) {
		SpringApplication.run(LoggingSinkApplication.class, args);
	}
}

这是一个简单的 Spring Cloud Stream 绑定。Sink.class 是一个定义了 MessageChannel input() 的接口。Spring Cloud Stream 会将其转换为一个实时、命名的通道,连接到一个消息代理(在本例中是 Redis,尽管未来几个月 Spring Cloud Data Flow 的默认设置可能会更改为 RabbitMQ),我们任何消息传递代码都可以使用这个通道。这个示例使用 Spring Integration 在消息到达时打印出传入的消息数据。首先,让我们向 Data Flow 注册我们的自定义模块,然后组成一个流,该流从 time 组件接收包含时间的传入消息,然后记录结果。

首先,对 logging-sink 项目执行 mvn clean install,以便它能在本地 Maven 仓库中被解析。Spring Cloud Data Flow 使用可插拔的策略来解析自定义模块的实例。在我们的示例中,它将尝试在我们的系统本地 Maven 仓库中解析它们。

返回 Data Flow Shell 并输入以下内容

dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'

dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║    source    │   processor    │       sink        │  task   ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file          │bridge          │aggregate-counter  │timestamp║
║ftp           │filter          │cassandra          │         ║
║http          │groovy-filter   │counter            │         ║
║jdbc          │groovy-transform│custom-log         │         ║
║jms           │httpclient      │field-value-counter│         ║
║load-generator│pmml            │file               │         ║
║rabbit        │splitter        │ftp                │         ║
║sftp          │transform       │gemfire            │         ║
║tcp           │                │gpfdist            │         ║
║time          │                │hdfs               │         ║
║trigger       │                │jdbc               │         ║
║twitterstream │                │log                │         ║
║              │                │rabbit             │         ║
║              │                │redis              │         ║
║              │                │router             │         ║
║              │                │tcp                │         ║
║              │                │throughput         │         ║
║              │                │websocket          │         ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝

dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'

dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝

dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'

你会在 Data Flow 服务日志中看到模块已经启动并连接在一起。在我的具体日志中,我观察到

2016-04-05 09:09:18.067  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.custom-log instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time

查看日志尾部,确认你内心深处已经知道的事情:我们的自定义 logging-sink 正在工作!

tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*

下一步

迈向云!我们使用的是本地 Data Flow 服务器。还有其他实现可用于像 Cloud Foundry 这样的处理平台。Cloud Foundry Data Flow Server 会启动应用程序实例,而不是本地 Java 进程。现在,构建一个可扩展的数据摄取和处理流就像 cf push ..cf scale -i $MOAR 一样简单!

我们只使用了 Spring Cloud Data Flow 的一小部分功能!使用 Spring Cloud Data Flow 可以编排任意数量由 Spring Cloud Stream 驱动的基于消息的微服务。我建议查看一些内置的 Spring Cloud Stream 模块以获取灵感。

订阅 Spring 新闻通讯

保持与 Spring 新闻通讯的连接

订阅

抢占先机

VMware 提供培训和认证,助您加速进步。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一个简单订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部