抢占先机
VMware 提供培训和认证,助您加速进步。
了解更多我只是想在这里记录下昨天让我会心一笑的一个经历:让快速改进的 Spring Cloud Data Flow 在几分钟内从 (Spring Boot) 启动器 (start(-ers)) 摇摆(wiggle)到服务 (service)!
唯一的先决条件是你有一个正在运行的 Redis 实例。我的 Redis 实例运行在
127.0.0.1
上,Spring Boot 无需额外配置即可找到并使用它。
我们将使用超赞的 Spring Initializr 轻松生成我们的应用程序。还记得那些愚蠢的 Apple 广告,“There's an App For That?” 别管那个了,现在是 勾选框 就行!看看你是否像我一样喜欢这个体验!
前往 Spring Initializr 选择 Local Data Flow Server
并将 artifact 命名为 df-server
。这将用于启动一个本地 Data Flow 服务 - 一个 REST API 和一些持久化逻辑 - 用于编排和存储关于流和任务的信息。在旧的 Spring XD 世界中,这被称为 Spring XD 的 Admin Server。
在你选择的 IDE 中打开项目,并将 @EnableDataFlowServer
添加到 DfServerApplication
类中
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;
@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {
public static void main(String[] args) {
SpringApplication.run(DfServerApplication.class, args);
}
}
在 df-server
项目的根目录下运行 mvn spring-boot:run
,应用程序将在端口 9393
上启动。
提示:当你看到欢迎 ASCII 艺术字时,你就(很可能)成功了!
关于提示的提示:好吧,这可能不完全正确。它可能会因为各种原因失败(比如服务或嵌入式 H2 数据库的端口冲突),但高质量的 ASCII 艺术字已被证明在(我的)研究中具有治疗作用……(研究对象是……我自己)。
前往 Spring Initializr 选择 Data Flow Shell
并将 artifact 命名为 df-shell
。这将用于启动一个由 Spring Shell 驱动的 Data Flow shell。
Data Flow shell 可以在任何操作系统上运行。它是我们刚刚启动的 Data Flow 服务的客户端。它允许我们使用熟悉的管道与过滤器 DSL 和命令来操作服务。我和其他开发者一样喜欢精美的横幅 ASCII 艺术字,但美好的事物也可能(呃!)过犹不及。默认情况下,Spring Shell 和 Spring Boot 都试图发出 ASCII 横幅,所以这次我们将让 Spring Boot 避开(这次!)。在你选择的 IDE 中打开项目,并将 @EnableDataFlowShell
添加到 DfShellApplication
类中,然后配置 SpringApplication
的创建方式以隐藏 Spring Boot 横幅。
package com.example;
import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;
@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DfShellApplication.class)
.bannerMode(Banner.Mode.OFF)
.run(args);
}
}
在 df-shell
项目的根目录下运行 mvn spring-boot:run
。默认情况下,你应该能够与在本地运行的 Data Flow 服务器进行交互。尝试执行 module list
命令。你应该会看到一个表格,其中列出了 Spring Cloud Data Flow 已知的所有内置组件。
前往 Spring Initializr 选择 Stream Redis
并将 artifact 命名为 logging-sink
。我们将使用 Spring Cloud Stream 构建一个记录传入消息的*自定义*模块。Spring Cloud Stream 构建于 Spring 的 MessageChannel
抽象和 Spring Integration 中的组件模型之上,可简化描述和集成基于消息的微服务的工作。然后,我们将使用 Spring Cloud Data Flow 来部署和编排这个模块。
Spring Cloud Data Flow 是一种强大的方式,可以用小的 Spring Boot 驱动的模块来描述复杂的集成、批处理和流处理工作负载。有几种类型的 module
(模块)。一个 source
(源)生成数据,通常按固定计划生成,下游组件可以消费和处理这些数据。一个 processor
(处理器)接收数据,对其进行处理,然后输出数据。一个 sink
(汇)只接收数据,但不产生任何要发送出去的东西。这些组件可以很好地组合在一起,描述任何潜在的连续工作负载(物联网传感器数据、24/7 事件处理、在线事务数据摄取和集成场景等)。最终,source 通常是 Spring Integration 的入站适配器。processor 通常是任何接收数据并输出数据的 Spring Integration 组件(如 transformer)。sink 通常是 Spring Integration 的出站适配器。
一个 task
(任务)描述任何最终会停止的工作负载。它可能是一个简单的 Spring Boot Command Line Runner
或一个 Spring Batch Job
。
尽管如此,Spring Cloud Data Flow 并没有特定的 Spring Integration 知识。它只了解 Spring Cloud Stream 以及众所周知的 Spring MessageChannels
(消息通道),例如 input
和 output
。它不关心这些通道的终端是什么。Spring Cloud Data Flow 也没有特定的 Spring Batch 知识。它只了解 Spring Cloud Task。
正如 UNIX sh
shell 环境允许我们通过将数据分别传递到 stdin
和从 stdout
传递数据,从而从单一功能的命令行工具组合出任意多且任意复杂的解决方案一样,Spring Cloud Data Flow 也允许我们从单一功能的消息传递组件组合出任意多且任意复杂的解决方案。
Spring Cloud Data Flow 已经内置了许多开箱即用的功能。我们将开发并安装一个简单的模块来记录东西 - 在我们的例子中,是时间。值得注意的是,我们这样做是为了自己的学习,但实际上我们并*不需要*这样做;Spring Cloud Data Flow 已经提供了 log
模块!(还有一个 time
模块!)
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.Map;
@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {
@MessageEndpoint
public static class LoggingMessageEndpoint {
@ServiceActivator(inputChannel = Sink.INPUT)
public void logIncomingMessages(
@Payload String msg,
@Headers Map<String, Object> headers) {
System.out.println(msg);
headers.entrySet().forEach(e ->
System.out.println(e.getKey() + '=' + e.getValue()));
}
}
public static void main(String[] args) {
SpringApplication.run(LoggingSinkApplication.class, args);
}
}
这是一个简单的 Spring Cloud Stream 绑定。Sink.class
是一个定义了 MessageChannel input()
的接口。Spring Cloud Stream 会将其转换为一个实时、命名的通道,连接到一个消息代理(在本例中是 Redis,尽管未来几个月 Spring Cloud Data Flow 的默认设置可能会更改为 RabbitMQ),我们任何消息传递代码都可以使用这个通道。这个示例使用 Spring Integration 在消息到达时打印出传入的消息数据。首先,让我们向 Data Flow 注册我们的自定义模块,然后组成一个流,该流从 time
组件接收包含时间的传入消息,然后记录结果。
首先,对 logging-sink
项目执行 mvn clean install
,以便它能在本地 Maven 仓库中被解析。Spring Cloud Data Flow 使用可插拔的策略来解析自定义模块的实例。在我们的示例中,它将尝试在我们的系统本地 Maven 仓库中解析它们。
返回 Data Flow Shell 并输入以下内容
dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'
dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║ source │ processor │ sink │ task ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file │bridge │aggregate-counter │timestamp║
║ftp │filter │cassandra │ ║
║http │groovy-filter │counter │ ║
║jdbc │groovy-transform│custom-log │ ║
║jms │httpclient │field-value-counter│ ║
║load-generator│pmml │file │ ║
║rabbit │splitter │ftp │ ║
║sftp │transform │gemfire │ ║
║tcp │ │gpfdist │ ║
║time │ │hdfs │ ║
║trigger │ │jdbc │ ║
║twitterstream │ │log │ ║
║ │ │rabbit │ ║
║ │ │redis │ ║
║ │ │router │ ║
║ │ │tcp │ ║
║ │ │throughput │ ║
║ │ │websocket │ ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝
dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'
dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│ Status ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝
dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'
你会在 Data Flow 服务日志中看到模块已经启动并连接在一起。在我的具体日志中,我观察到
2016-04-05 09:09:18.067 INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.custom-log instance 0
Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838 INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time instance 0
Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time
查看日志尾部,确认你内心深处已经知道的事情:我们的自定义 logging-sink
正在工作!
tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*
迈向云!我们使用的是本地 Data Flow 服务器。还有其他实现可用于像 Cloud Foundry 这样的处理平台。Cloud Foundry Data Flow Server 会启动应用程序实例,而不是本地 Java 进程。现在,构建一个可扩展的数据摄取和处理流就像 cf push ..
和 cf scale -i $MOAR
一样简单!
我们只使用了 Spring Cloud Data Flow 的一小部分功能!使用 Spring Cloud Data Flow 可以编排任意数量由 Spring Cloud Stream 驱动的基于消息的微服务。我建议查看一些内置的 Spring Cloud Stream 模块以获取灵感。