Spring XD 1.0 里程碑 1 发布

工程 | Mark Pollack | 2013年6月12日 | ...

今天我们很高兴地宣布 Spring XD 1.0 M1 版本发布  (下载)。Spring XD 是一个统一、分布式、可扩展的系统,用于数据摄取、实时分析、批处理和数据导出。 该项目的目标是简化大数据应用的开发。

从宏观角度看,大数据应用与企业集成和批处理应用有许多共同特征。 通过 Spring Integration 和 Spring Batch 项目,Spring 已经为构建集成和批处理应用提供了超过 6 年的成熟解决方案。 Spring XD 在此基础上构建,提供了一个轻量级的运行时环境,可以通过简单的 DSL 轻松配置和组装。

在这篇博客中,我们将介绍 Spring XD 的关键组件,即 Streams、Jobs、Taps、Analytics 以及用于声明它们的 DSL,还有运行时架构。 更多详细信息可以在XD 指南中找到。

Stream 定义了数据如何被收集、处理、存储或转发。 例如,一个 stream 可以收集 syslog 数据,过滤它,并存储到 HDFS 中。 Spring XD 提供了一个 DSL 来定义 stream。 DSL 允许你使用类似 UNIX 管道和过滤器的简单语法开始构建线性处理流程,但也允许你使用扩展语法描述更复杂的流程。

源和 Sink

一个简单的线性 stream 包括以下序列:输入源、(可选的)处理步骤和输出 Sink。 一个简单的例子是收集来自 HTTP Source 的数据并写入 File Sink。描述此 stream 的 DSL 是
http | file

你通过向默认运行在端口 8080 上的 XD Admin Server 发送 HTTP 请求来告诉 Spring XD 创建一个 stream。 在 M2 版本中,我们将提供一个交互式 shell 与 XD 通信,但在 M1 中,最简单的方式是使用 'curl' 与 XD 交互。

curl -d "http | file" http://localhost:8080/streams/httptest

Stream 的名称是 httptest,默认监听的 HTTP 端口是 9000,默认的文件位置是 /tmp/xd/output/${streamname}

如果你使用 curl 在端口 9000 上发布一些数据,例如
curl -d "hello world" http://localhost:9000

你会在文件 /tmp/xd/output/httptest 中看到字符串 'hello world'

要更改默认值,你可以传入选项参数

http --port=9090 | file --dir=/var/streams --name=data.txt

M1 版本中支持的源包括 file、time、HTTP、Tail、Twitter Search、Gemfire (Continuous Queries)、Gemfire (Cache Event)、Syslog 和 TCP。 支持的 Sink 包括 Log、File、HDFS、 Gemfire 分布式数据网格和 TCP。 要将 syslog 数据捕获到 HDFS,DSL 如下

syslog | hdfs --namenode="http://192.168.1.100:9000"

你也可以添加自定义的源和 Sink。 通过遵循简单步骤,可以添加 Spring Integration 中现有的 Inbound 和 Outbound Channel Adapters。 未来的版本将增加对 MQTT、RabbitMQ、JMS 和 Kafka 的支持。 我们欢迎pull request来贡献你偏好的源和 Sink 模块。

Stream 的编程模型基于 Spring Integration。 输入源将外部数据转换为包含头部(包含键值对)和载荷(可以是任何 Java 类型)的 Message。消息通过 Message Channels 流经 stream。下图显示了一个包含输入源、处理步骤和输出 Sink 的 stream。

处理器

包含多个处理步骤的 stream 如下图所示。 所有处理步骤都通过 Channels 连接在一起。

在 DSL 中,管道符号对应于将数据从一个处理步骤传递到下一个步骤的 channel。 Spring XD 中的 channel 可以是内存中的,也可以由 Redis、JMS、RabbitMQ 等中间件支持。 这实现了一个简单的分布式处理模型,我们将在稍后讨论。

表示带有处理步骤的 stream 的 DSL 表达式形式如下

source | filter | transform | sink

M1 版本中支持的处理器包括 filter、transformer、json-field-extractor、json-field-value-filter 和 script。  filter 和 transformer 处理器支持使用 Spring Expression Language  (SpEL) 和 Groovy。 在前面的例子中,要使用 SpEL 将 HTTP 请求的载荷转换为大写,

http | transform --expression=payload.toUpperCase() | file

script 处理器也允许你执行自定义的 Groovy 代码。

Tap

Tap 允许你“监听”来自另一个 stream 的数据并在一个单独的 stream 中处理这些数据。原始 stream 不受 Tap 影响,也感知不到其存在,类似于电话线上的窃听器。 WireTap是 EAI 模式标准目录的一部分,并且 Spring XD 使用的 Spring Integration 框架的一部分。

Tap 可以从目标 stream 处理管道中的任何点消费数据。例如,如果你有一个名为 mystream 的 stream,定义如下
source | filter | transform | sink

你可以使用 DSL 创建一个 tap

tap mystream.filter | sink2

这将在应用 filter 之后但在 transformer 之前窃听 stream 的数据。因此,未转换的数据将被发送到 sink2。

例如,如果你使用命令创建一个名为 httpstream 的 stream

curl  -d "http --port=9898 | filter --expression='payload.length() > 5'
                           | transform --expression=payload.toUpperCase()
                           | file"  http://localhost:8080/streams/httpstream

然后要在名为 httptap 的 stream 上创建一个将过滤后的数据 stream 写入单独文件的 tap,使用以下命令

curl -d "tap httpstream.filter | file --dir=/tmp --name=filtered.txt" http://localhost:8080/streams/httptap

发布数据,例如

curl -d "hello world" http://localhost:9898
curl -d "he" http://localhost:9898
curl -d "hello world 2" http://localhost:9898

结果会在文件 /tmp/xd/output/httpstream 中看到 HELLO WORLD 和 HELLO WORLD 2,在 /tmp/filtered.txt 中看到小写版本。文本 'he' 将不会出现在任一文件中。

一个主要用例是在通过其主 stream 摄取数据的同时执行实时分析。例如,考虑一个消费 Twitter 搜索结果并将其写入 HDFS 的数据 stream。可以在数据写入 HDFS 之前创建一个 tap,并将来自 tap 的数据通过管道传递给一个计数器,该计数器对应于推文中特定话题标签被提及的次数。

分析

问 10 个开发者什么是‘实时分析’,你会得到 20 个答案。 答案范围从非常简单(但极其有用)的计数器,到移动平均,聚合计数器,直方图,时间序列,机器学习算法,再到嵌入式 CEP 引擎。 Spring XD 旨在作为一个通用类库,支持广泛的这些指标和分析数据结构,并与多种后端存储技术配合使用。 它们也作为一种 Sink 类型暴露给 XD,用于 DSL 表达式中。

在 M1 版本中,支持 Counter、Field Value Counter、Gauge 和 Rich Gauge。这些指标可以存储在内存中或 Redis 中。 更多详情以及未来版本将实现的列表,请参阅JavaDocs和 用户指南的分析部分

举个例子,考虑收集 stream 中推文话题标签实时频率计数的情况。 要使用 SpringXD 执行此操作,创建一个新的 stream 定义,使用 twitter search source 模块并命名为 ‘spring’

curl -d "twittersearch --query='spring' --consumerKey=<consumer-key> --consumerSecret=<consumer-secret>
           | file" http://localhost:8080/streams/spring

这会将推文存储在本地文件系统中。 注意,要获取consumerKeyconsumerSecret你需要注册一个 Twitter 应用程序。如果你还没有设置,可以在Twitter 开发者网站上创建一个应用以获取这些凭据。

接下来,在 twittersearch 源的输出上创建一个名为 ‘springtap’ 的 tap,用于统计推文中的话题标签频率。

curl -d "tap spring.twittersearch | field-value-counter
                                     --fieldName=entities.hashTags.text
                                     --counterName=hashTagFrequency" http://localhost:8080/streams/springtap

字段 entities.hashTags.text 是用于底层实现的 Spring Social Tweet 对象 JSON 表示中话题标签的路径。 要查看前 5 个话题标签,请使用 redis-cli 查看名为 fieldvaluecounters.hashTagFrequency. 的有序集合内容。注意,通常需要几分钟才能收集到足够包含话题标签实体的推文。

> redis-cli
redis 127.0.0.1:6379>ZREVRANGEBYSCORE fieldvaluecounters.hashTagFrequency +inf -inf WITHSCORES LIMIT 0 5

1] "spring"
2] "6"
3] "Turkey"
4] "6"
5] "Arab"
6] "6"
7] "summer"
8] "3"
9] "fashion"
10] "3"

架构

Spring XD 有两种运行模式:单节点模式和分布式模式。第一种是单个进程处理所有处理和管理任务。这种模式有助于你轻松入门,并简化应用的开发和测试。分布式模式允许处理任务分布在一组机器上,一个管理服务器发送命令来控制在集群上执行的处理任务。

M1 版本中的分布式架构很简单。 stream 的每个部分(称为 module)可以在自己的 container 实例中执行。 模块之间的数据通过 Redis 队列传递。 更多详情请参阅架构部分。 此版本的主要重点是正确设计抽象,例如让 DSL 中的管道符号可以在各种 transport 中即插即用。 未来的版本将提供其他 transport 和性能改进,以及在 Hadoop 集群内部执行的支持。

更多即将推出

本文未涵盖的其他主题包括介绍Tuple 数据结构以及如何创建自定义处理器。 下一版本的一个重要部分将是支持 XDContainer 运行 Spring Batch 作业。 这些作业可用于帮助将数据从 HDFS 导出到关系型数据库,以及编排 Hadoop 作业(MapReduce、Pig、Hive 或 Cascading 作业)在集群上的执行。 我们还将为指标(如聚合计数器、基于 HTTP/JMX 的管理)提供额外的库,以及基于Reactor项目的某些高性能源,敬请关注!

在我们继续努力向最终的 Spring XD 1.0.0 版本迈进之际,我们很想听取您的反馈。如果您有任何问题,请使用Stackoverflow(标签:springxd),要报告任何错误或改进,请使用Jira Issue Tracker或提交GitHub issue

获取 Spring 新闻通讯

订阅 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您飞速前进。

了解更多

获取支持

Tanzu Spring 通过一个简单的订阅提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

近期活动

查看 Spring 社区中的所有近期活动。

查看全部