保持领先
VMware 提供培训和认证,助您快速进步。
了解更多今天,我们很高兴宣布 Spring XD 1.0 M2 版本发布 (下载) Spring XD 是一个用于数据提取、实时分析、批处理和数据导出的统一、分布式、可扩展的系统。该项目的目标是简化大数据应用的开发。
Spring XD 的第二个里程碑版本引入了一些新特性,使得数据提取和处理实时流以及协调基于 Hadoop 的批处理作业变得更加容易。在这篇博文中,我们将介绍
在单节点模式下启动 Spring XD ($XD_HOME/bin/xd-singlenode),并在一个单独的窗口中启动 shell。下面的示例展示了如何创建一个简单的流,将通过 http 发送的数据写入文件。注意,shell 为命令提供了 tab 补全提示。
$bin>./xd-shell
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name httpStream --definition "http | file"
xd:>tap create --name httpTap --definition "tap httpStream | counter"
xd:>http post --target http://localhost:9000 --data "helloworld"
您可以列出所有流和 taps,以验证它们是否已创建
xd:>stream list
Stream Name Stream Definition
----------- -----------------
httpStream http | file
xd:>tap list
Tap Name Stream Name Tap Definition
-------- ----------- ------------------------
httpTap httpStream tap httpStream | counter
如果您检查位于目录 /tmp/xd/output/httpStream.out
中的文件,您将看到 hello world 消息。
xd:>! cat /tmp/xd/output/httpStream.out
The httpTap is simply counting messages. To see the name of the counter created and its value, use the counter shell command
xd:>counter list
Counter name
------------
httpTap
xd:>counter display --name httpTap
1
在单节点模式下,计数器存储在内存中,但也支持 Redis,这是非单节点模式下的默认设置。您可以使用命令行参数 –analytics redis
启用 Redis 支持。
要创建一个将数据存储在 Hadoop 中的 twitter 流以及一个实时计算推文中话题标签频率的计数器,请运行以下命令。请注意,要获得一个consumerKey和consumerSecret您需要注册一个 twitter 应用。如果您还没有设置,您可以在Twitter 开发者网站上创建一个应用来获取这些凭据。
xd:> stream create bieberStream --definition "twittersearch --consumerKey=<your-key> --consumerSecret=<your-secret> --query=bieber | hdfs"
xd:> tap create --name bieberHashTap --definition "tap bieberStream | field-value-counter --fieldName=entities.hashTags.text --counterName=bieberHashCount"
xd:> hadoop config fs --namenode hdfs://localhost:8020
xd:> hadoop fs cat /xd/bieberStream/bieberStream-0.log
... see fun tweets here ...
xd:> fieldvaluecounter display --name bieberHashCount
FieldName=bieberHashCount
------------------------- - -----
VALUE - COUNT
mtvhottest | 57
MTVHottest | 31
MTVhottest | 10
mtvhottets | 3
MtvHottest | 2
MTVHott | 2
JustinBieber | 2
MTVH | 2
MTVHOTTEST | 2
KCAMEXICO | 1
BeliebersAreProudOfJustin | 1
MyBeliebers | 1
在计数器话题上,引入了一种新的聚合计数器类型,它可以将消息中某个字段的计数按年、月、日、小时和分钟的时间桶进行聚合。
只需几行 shell 命令,您就完成了不少工作!请查看用户指南以了解所有 shell 命令的详细信息。
到目前为止,所示的流处理管道都是线性的,但通常需要支持更复杂的流程。为了解决这种情况,在 M2 版本中引入了命名通道。您可以不使用源或目标模块,而是使用命名通道。为了与 unix 主题保持一致,从/向特定通道提取/发送数据使用 `>’ 字符,并且名称前缀带有 `:`
下面是一个示例,展示了如何使用命名通道来共享由不同输入源驱动的数据管道。
xd:>stream create out --definition ":foo > file --name=demo"
xd:>stream create in1 --definition "http > :foo"
xd:>stream create in2 --definition "time > :foo"
xd:>http post --target http://localhost:9000 --data "hello"
查看输出文件
xd:>! cat /tmp/xd/output/demo.out
您会看到单词“hello”与时间戳值混合在一起。将消息扇出到多个流以及根据消息内容将消息路由到不同流的支持计划在未来的里程碑版本中实现。
另外值得注意的是,我们增加了对 4 个 Hadoop 版本的支持
在启动 XDContainer 时,可以通过传递命令行选项 –hadoopDistribution
来选择要使用的特定发行版 jar。您也应该能够使用其他 Hadoop 发行版,例如 Hadoop 1.2.x。我们将在后续版本中添加针对其他发行版的明确选项。值得注意的是, 在示例仓库中有一个示例展示了如何将 Spring XD 与 Pivotal HD 的 HAWQ 功能一起使用。
M1 版本提供了基于本地和 Redis 队列的传输方式,用于模块之间的通信(在 DSL 中用管道符号表示)。M2 版本提供了基于 Rabbit 的传输方式支持,允许您利用功能齐全的消息代理进行流提取。
可以使用 Spring XD 执行批处理作业,并设置触发器来启动这些作业。例如,我们可以重用 Hadoop 中的经典 wordcount 示例,提供一个简单的包含两个步骤的工作流编排。第一步是将文件复制到 HDFS,第二步是运行 wordcount MapReduce 作业。
要运行示例,请克隆spring-xd-samples 仓库并构建示例 batch-wordcount。然后按如下所示复制 jar、config 和数据文件。
$ cd batch-wordcount
$ mvn clean assembly:assembly
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/modules/job/* $XD_HOME/modules/job
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/lib/* $XD_HOME/lib
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/nietzsche-chapter-1.txt /tmp
现在停止并在单节点模式下重新启动 Spring XD ($XD_HOME/bin/xd-singlenode)。然后在 shell 中执行以下命令
xd:> job create --name wordCountJob --definition "wordcount"
或者,您也可以指定一个 cron 表达式来安排作业执行。您可以通过查看 map reduce 作业的输出来验证结果。
xd:> hadoop config fs --namenode hdfs://localhost:8020
xd:> hadoop fs cat /count/out/part-r-00000
工作流中也支持其他步骤,例如执行 Hive 或 Pig 脚本。要编写这些类型的工作流,请查阅 Spring for Apache Hadoop 参考指南。也支持非 Hadoop 的步骤。
下一个版本的一个主要主题是通过整合Spring Batch Admin项目的组件来提供更多批处理作业的管理功能。您将能够通过向命名通道发送消息来触发批处理作业,并从命名通道接收作业状态通知。这将使您能够轻松设置基于数据可用性的批处理作业触发,例如
file --dir "/data/inbound" | jobParameterCreator > :wordCountJob
当文件在目录 /data/inbound
中可用时,通过向命名通道 :wordCountJob. 发送消息来启动 wordcount 批处理作业。当批处理作业执行时,会有一系列数据流供您消费,这些消息包含关于JobExecution、StepExecution 等的数据。
:wordCountJob.notifications > filter --expression "payload.status.equals('COMPLETED')" | email --address "[email protected]"
使用通道在流和作业之间交换数据是您可以开始了解 Spring XD 如何采取措施统一流处理和批处理这两个领域的一个方面。敬请关注!