领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我很高兴地宣布,Spring for Apache Hadoop 项目的第一个里程碑版本 (1.0.0.M1) 已经发布,并且将介绍我们在过去几个月里所做的一些工作。作为 Spring Data 系列项目的一部分,Spring for Apache Hadoop 通过利用 Spring 生态系统的能力,为基于 Apache Hadoop 技术开发应用程序提供了支持。无论是编写独立的、原生的 MapReduce 应用程序,与企业中多个数据存储的数据进行交互,还是协调 HDFS、Pig 或 Hive 作业的复杂工作流,或者介于两者之间的任何工作,Spring for Apache Hadoop 都始终遵循 Spring 的理念,提供简化的编程模型,并解决由基础设施引起的“意外复杂性”。Spring for Apache Hadoop 为开发者处理海量数据提供了强大的工具。
<!-- configure Apache Hadoop FS/job tracker using defaults -->
<hdp:configuration />
<!-- define the job -->
<hdp:job id="word-count"
input-path="/input/" output-path="/ouput/"
mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>
<!-- execute the job -->
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner"
p:jobs-ref="word-count"/>
请注意,作业配置的创建和提交是由 IoC 容器处理的。无论是需要调整 Apache Hadoop 配置,还是 reducer 需要额外的参数,所有配置选项仍然可以供您进行配置。这允许您从小处着手,并让配置随着应用程序的增长而扩展。配置可以像开发者想要/需要的任何方式一样简单或高级,同时利用 Spring 容器的功能,例如 属性占位符 和 环境支持。
<hdp:configuration resources="classpath:/my-cluster-site.xml">
fs.default.name=${hd.fs}
hadoop.tmp.dir=file://${java.io.tmpdir}
electric=sea
</hdp:configuration>
<context:property-placeholder location="classpath:hadoop.properties" />
<!-- populate Apache Hadoop distributed cache -->
<hdp:cache create-symlink="true">
<hdp:classpath value="/cp/some-library.jar#library.jar" />
<hdp:cache value="/cache/some-archive.tgz#main-archive" />
<hdp:cache value="/cache/some-resource.res" />
</hdp:cache>
(单词计数 示例 是 Spring for Apache Hadoop 发行版的一部分 - 欢迎下载并进行实验)。
Spring for Apache Hadoop 不需要您用 Java 重写 MapReduce 作业,您可以无缝地使用非 Java 流式处理作业:它们就像任何其他对象一样(或者 Spring 称之为 bean)由框架以一致、连贯的方式创建、配置、连接和管理。开发者可以根据自己的偏好和需求进行混合搭配,而无需担心集成问题。
<hdp:streaming id="streaming-env"
input-path="/input/" output-path="/ouput/"
mapper="${path.cat}" reducer="${path.wc}">
<hdp:cmd-env>
EXAMPLE_DIR=/home/example/dictionaries/
</hdp:cmd-env>
</hdp:streaming>
现有的 Apache Hadoop Tool 实现也得到了支持;实际上,与其通过命令行指定自定义 Apache Hadoop 属性,不如直接注入它们。
<!-- the tool automatically is injected with 'hadoop-configuration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
<hdp:arg value="tutorial/Tutorial1"/>
<hdp:arg value="--local"/>
</hdp:tool-runner>
上面的配置执行了 Twitter 的 Scalding(一个基于 Cascading(见下文)库的 Scala DSL)的 Tutorial1。请注意,Spring for Apache Hadoop 或 Scalding 中没有*专门*的支持代码 - 只是使用了标准的 Apache Hadoop API。
<!-- HBase configuration with nested properties -->
<hdp:hbase-configuration stop-proxy="false" delete-connection="true">
foo=bar
</hdp:hbase-configuration>
<!-- create a Pig instance using custom properties
and execute a script (using given arguments) at startup -->
<hdp:pig properties-location="pig-dev.properties" />
<script location="org/company/pig/script.pig">
<arguments>electric=tears</arguments>
</script>
</hdp:pig>
通过 Spring for Apache Hadoop,您不仅获得了一个强大的 IoC 容器,还可以访问 Spring 的可移植服务抽象。以流行的 JdbcTemplate 为例,您可以在 Hive 的 Jdbc 客户端之上使用它。
<!-- basic Hive driver bean -->
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>
<!-- wrapping a basic datasource around the driver -->
<bean id="hive-ds"
class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
c:driver-ref="hive-driver" c:url="${hive.url}"/>
<!-- standard JdbcTemplate declaration -->
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
c:data-source-ref="hive-ds"/>
@Configuration
public class CascadingConfig {
@Value("${cascade.sec}") private String sec;
@Bean public Pipe tsPipe() {
DateParser dateParser = new DateParser(new Fields("ts"),
"dd/MMM/yyyy:HH:mm:ss Z");
return new Each("arrival rate", new Fields("time"), dateParser);
}
@Bean public Pipe tsCountPipe() {
Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
}
}
<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingConfig "/>
<bean id="cascade"
class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean"
p:configuration-ref="hadoop-configuration" p:tail-ref="tsCountPipe" />
上面的示例混合了命令式和声明式配置:前者用于创建单独的 Cascading 管道,后者用于将它们连接成一个流。
<task:scheduler id="myScheduler" pool-size="10"/>
<task:scheduled-tasks scheduler="myScheduler">
<!-- run once a day, at midnight -->
<task:scheduled ref="word-count-job" method="submit" cron="0 0 * * * "/>
</task:scheduled-tasks>
上面的配置使用了一个简单的 JDK Executor 实例 - 非常适合 POC 开发。您可以轻松地在生产环境中将它(只需一行代码)替换为更全面的解决方案,例如专用的 调度器 或 WorkManager 实现 - 这是 Spring 强大服务抽象的又一个例子。
在与 HDFS 交互时,一个常见的任务是准备文件系统,例如清理输出目录以避免覆盖数据,或者将所有输入文件移动到同一个名称方案或文件夹下。Spring for Apache Hadoop 通过完全支持 Apache Hadoop 的 fs 命令,如 FS Shell 和 DistCp,并将它们暴露为标准的 Java API,来解决这个问题。将其与 JVM 脚本(无论是 Groovy、JRuby 还是 Rhino/JavaScript)结合起来,可以形成一个强大的组合。
<hdp:script language="groovy">
inputPath = "/user/gutenberg/input/word/"
outputPath = "/user/gutenberg/output/word/"
if (fsh.test(inputPath)) {
fsh.rmr(inputPath)
}
if (fsh.test(outputPath)) {
fsh.rmr(outputPath)
}
fs.copyFromLocalFile("data/input.txt", inputPath)
</hdp:script>
这篇博文只是触及了 Spring for Apache Hadoop 中一些可用功能的一小部分;我还没有提到 Spring Batch 集成,它为各种 Apache Hadoop 交互提供了 tasklet,或者使用 Spring Integration 进行事件触发 - 将在未来的文章中详细介绍。请让我们知道您的想法、您的需求并给我们反馈:下载代码,fork 源代码,报告问题,在论坛上发帖或给我们发推文。