领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多今天,我们高兴地宣布 Spring XD 的 1.0 M2 版本发布(下载)。Spring XD 是一个统一的、分布式的、可扩展的系统,用于数据摄取、实时分析、批处理和数据导出。该项目的目的是简化大数据应用程序的开发。
Spring XD 的第二个里程碑版本引入了几个新功能,使摄取和处理实时数据流以及编排基于 Hadoop 的批处理作业变得更加容易。在本博文中,我们将介绍
以单节点模式启动 Spring XD($XD_HOME/bin/xd-singlenode),并在另一个窗口中启动 shell。以下示例显示了如何创建一个简单的流,该流将通过 http 发布的数据写入文件。请注意,shell 为命令提供制表符自动完成功能。
$bin>./xd-shell
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name httpStream --definition "http | file"
xd:>tap create --name httpTap --definition "tap httpStream | counter"
xd:>http post --target https://127.0.0.1:9000 --data "helloworld"
您可以列出所有流和 tap 以验证它们是否已创建。
xd:>stream list
Stream Name Stream Definition
----------- -----------------
httpStream http | file
xd:>tap list
Tap Name Stream Name Tap Definition
-------- ----------- ------------------------
httpTap httpStream tap httpStream | counter
如果您检查位于目录/tmp/xd/output/httpStream.out
中的文件,您将看到“hello world”消息。
xd:>! cat /tmp/xd/output/httpStream.out
The httpTap is simply counting messages. To see the name of the counter created and its value, use the counter shell command
xd:>counter list
Counter name
------------
httpTap
xd:>counter display --name httpTap
1
在单节点模式下,计数器存储在内存中,但也支持 Redis,在不使用单节点模式时,Redis 为默认设置。您可以使用–analytics redis
命令行参数启用 Redis 支持。
要创建一个将数据存储在 Hadoop 中的 Twitter 流,以及对推文中主题标签频率的实时计数,请运行以下命令。请注意,要获取consumerKey和consumerSecret您需要注册一个 Twitter 应用程序。如果您还没有设置一个,您可以在Twitter 开发者网站上创建一个应用程序以获取这些凭据。
xd:> stream create bieberStream --definition "twittersearch --consumerKey=<your-key> --consumerSecret=<your-secret> --query=bieber | hdfs"
xd:> tap create --name bieberHashTap --definition "tap bieberStream | field-value-counter --fieldName=entities.hashTags.text --counterName=bieberHashCount"
xd:> hadoop config fs --namenode hdfs://127.0.0.1:8020
xd:> hadoop fs cat /xd/bieberStream/bieberStream-0.log
... see fun tweets here ...
xd:> fieldvaluecounter display --name bieberHashCount
FieldName=bieberHashCount
------------------------- - -----
VALUE - COUNT
mtvhottest | 57
MTVHottest | 31
MTVhottest | 10
mtvhottets | 3
MtvHottest | 2
MTVHott | 2
JustinBieber | 2
MTVH | 2
MTVHOTTEST | 2
KCAMEXICO | 1
BeliebersAreProudOfJustin | 1
MyBeliebers | 1
说到计数器,一个新的聚合计数器类型被引入,它将消息中字段的计数聚合到每一年、一个月、一天、一小时和一分钟的时间段。
只需几行 shell 命令,您就完成了大量工作!请查看用户指南,了解所有 shell 命令的详细信息。
到目前为止显示的流处理管道是线性的,但通常需要支持更复杂的流程。为了解决这种情况,M2 中引入了命名通道。您可以使用命名通道代替数据源或数据接收器模块。为了保持与 Unix 的一致性,从特定通道获取/发送数据使用“`>`”字符,并且名称以“`:`”为前缀。
这是一个示例,它显示了如何使用命名通道来共享由不同输入源驱动的 数据管道。
xd:>stream create out --definition ":foo > file --name=demo"
xd:>stream create in1 --definition "http > :foo"
xd:>stream create in2 --definition "time > :foo"
xd:>http post --target https://127.0.0.1:9000 --data "hello"
查看输出文件
xd:>! cat /tmp/xd/output/demo.out
您将看到“hello”单词与时间戳值交织在一起。将消息分发到多个流以及根据消息内容将消息路由到不同流的支持计划在未来的里程碑版本中实现。
同样值得注意的是,我们增加了对 4 个 Hadoop 版本的支持
通过传入命令行选项–hadoopDistribution
,您可以选择在启动 XDContainer 时使用的特定发行版 jar 包。您也可以使用其他 Hadoop 发行版,例如 Hadoop 1.2.x。我们将在以后的版本中为其他发行版添加显式选项。值得注意的是,在示例存储库中有一个示例展示了如何将 Spring XD 与 Pivotal HD 的 HAWQ 功能一起使用。
M1 版本为模块之间的通信提供了本地和 Redis 队列支持的传输,如 DSL 中的管道符号所示。M2 版本提供了对 Rabbit 支持的传输,允许您利用功能齐全的消息代理进行流摄取。
可以使用 Spring XD 执行批处理作业,并设置触发器来启动这些作业。例如,我们可以重用 Hadoop 中经典的 wordcount 示例来提供一个简单的流程编排,该编排包含两个步骤。第一步是将文件复制到 HDFS,第二步是运行 wordcount MapReduce 作业。
要运行该示例,请克隆spring-xd-samples 存储库并构建示例 batch-wordcount。然后,按照以下步骤复制 jar、配置和数据文件。
$ cd batch-wordcount
$ mvn clean assembly:assembly
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/modules/job/* $XD_HOME/modules/job
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/lib/* $XD_HOME/lib
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/nietzsche-chapter-1.txt /tmp
现在以单节点模式停止并重新启动 Spring XD($XD_HOME/bin/xd-singlenode)。然后在 shell 中执行以下命令
xd:> job create --name wordCountJob --definition "wordcount"
或者,也可以指定一个 cron 表达式来安排作业执行。您可以通过查看 MapReduce 作业的输出结果来验证结果。
xd:> hadoop config fs --namenode hdfs://127.0.0.1:8020
xd:> hadoop fs cat /count/out/part-r-00000
还支持工作流中的其他步骤,例如执行 Hive 或 Pig 脚本。要创作这些类型的流程,请查阅 Spring for Apache Hadoop 参考指南。也支持非基于 Hadoop 的步骤。
下一个版本的主题是通过整合Spring Batch Admin项目的组件来公开更多批处理作业的管理功能。您将能够通过向命名通道发送消息来触发批处理作业,以及从命名通道接收作业状态通知。这将允许您轻松地根据数据可用性设置批处理作业的触发,例如
file --dir "/data/inbound" | jobParameterCreator > :wordCountJob
当文件出现在目录/data/inbound
中时,通过向命名通道 :wordCountJob 发送消息来启动 wordcount 批处理作业。当批处理作业执行时,将提供一个数据流,以便您可以使用有关JobExecution、StepExecution等的数据消息。
:wordCountJob.notifications > filter --expression "payload.status.equals('COMPLETED')" | email --address "[email protected]"
使用通道在流和作业之间交换数据是您可以开始看到 Spring XD 如何逐步统一流和批处理这两个领域的地方之一。敬请期待!