隆重推出 Spring for Apache Hadoop

工程 | Costin Leau | 2012年2月29日 | ...

我很高兴地宣布,Spring for Apache Hadoop 项目的第一个里程碑版本 (1.0.0.M1) 已经发布,并且将介绍我们在过去几个月里所做的一些工作。作为 Spring Data 系列项目的一部分,Spring for Apache Hadoop 通过利用 Spring 生态系统的能力,为基于 Apache Hadoop 技术开发应用程序提供了支持。无论是编写独立的、原生的 MapReduce 应用程序,与企业中多个数据存储的数据进行交互,还是协调 HDFS、Pig 或 Hive 作业的复杂工作流,或者介于两者之间的任何工作,Spring for Apache Hadoop 都始终遵循 Spring 的理念,提供简化的编程模型,并解决由基础设施引起的“意外复杂性”。Spring for Apache Hadoop 为开发者处理海量数据提供了强大的工具。

MapReduce 作业

Apache Hadoop 的 “Hello world”单词计数 示例 - 一个展示 Apache Hadoop 核心功能的简单用例。在使用 Spring for Apache Hadoop 时,单词计数示例如下所示:
<!-- configure Apache Hadoop FS/job tracker using defaults -->
<hdp:configuration />

<!-- define the job -->
<hdp:job id="word-count" 
  input-path="/input/" output-path="/ouput/"
  mapper="org.apache.hadoop.examples.WordCount.TokenizerMapper"
  reducer="org.apache.hadoop.examples.WordCount.IntSumReducer"/>

<!-- execute the job -->
<bean id="runner" class="org.springframework.data.hadoop.mapreduce.JobRunner" 
                  p:jobs-ref="word-count"/>

请注意,作业配置的创建和提交是由 IoC 容器处理的。无论是需要调整 Apache Hadoop 配置,还是 reducer 需要额外的参数,所有配置选项仍然可以供您进行配置。这允许您从小处着手,并让配置随着应用程序的增长而扩展。配置可以像开发者想要/需要的任何方式一样简单或高级,同时利用 Spring 容器的功能,例如 属性占位符环境支持

<hdp:configuration resources="classpath:/my-cluster-site.xml">
    fs.default.name=${hd.fs}
    hadoop.tmp.dir=file://${java.io.tmpdir}
    electric=sea
</hdp:configuration>

<context:property-placeholder location="classpath:hadoop.properties" />

<!-- populate Apache Hadoop distributed cache -->
<hdp:cache create-symlink="true">
  <hdp:classpath value="/cp/some-library.jar#library.jar" />
  <hdp:cache value="/cache/some-archive.tgz#main-archive" />
  <hdp:cache value="/cache/some-resource.res" />
</hdp:cache>

(单词计数 示例 是 Spring for Apache Hadoop 发行版的一部分 - 欢迎下载并进行实验)。

Spring for Apache Hadoop 不需要您用 Java 重写 MapReduce 作业,您可以无缝地使用非 Java 流式处理作业:它们就像任何其他对象一样(或者 Spring 称之为 bean)由框架以一致、连贯的方式创建、配置、连接和管理。开发者可以根据自己的偏好和需求进行混合搭配,而无需担心集成问题。

<hdp:streaming id="streaming-env" 
  input-path="/input/" output-path="/ouput/"
  mapper="${path.cat}" reducer="${path.wc}">
  <hdp:cmd-env>
    EXAMPLE_DIR=/home/example/dictionaries/
  </hdp:cmd-env>
</hdp:streaming>

现有的 Apache Hadoop Tool 实现也得到了支持;实际上,与其通过命令行指定自定义 Apache Hadoop 属性,不如直接注入它们。

<!-- the tool automatically is injected with 'hadoop-configuration' -->
<hdp:tool-runner id="scalding" tool-class="com.twitter.scalding.Tool">
   <hdp:arg value="tutorial/Tutorial1"/>
   <hdp:arg value="--local"/>
</hdp:tool-runner>

上面的配置执行了 Twitter 的 Scalding(一个基于 Cascading(见下文)库的 Scala DSL)的 Tutorial1。请注意,Spring for Apache Hadoop 或 Scalding 中没有*专门*的支持代码 - 只是使用了标准的 Apache Hadoop API。

使用 HBase/Hive/Pig

谈到 DSL,在使用 Apache Hadoop 时,使用更高级别的抽象是很常见的 - 流行的选择包括 HBaseHivePig。Spring for Apache Hadoop 为所有这些提供了集成,允许在 Spring 应用程序中轻松配置和使用这些*数据源*。
<!-- HBase configuration with nested properties -->
<hdp:hbase-configuration stop-proxy="false" delete-connection="true">
    foo=bar
</hdp:hbase-configuration>

<!-- create a Pig instance using custom properties
    and execute a script (using given arguments) at startup -->
     
<hdp:pig properties-location="pig-dev.properties" />
   <script location="org/company/pig/script.pig">
     <arguments>electric=tears</arguments>
   </script>
</hdp:pig>

通过 Spring for Apache Hadoop,您不仅获得了一个强大的 IoC 容器,还可以访问 Spring 的可移植服务抽象。以流行的 JdbcTemplate 为例,您可以在 Hive 的 Jdbc 客户端之上使用它。

<!-- basic Hive driver bean -->
<bean id="hive-driver" class="org.apache.hadoop.hive.jdbc.HiveDriver"/>

<!-- wrapping a basic datasource around the driver -->
<bean id="hive-ds"
    class="org.springframework.jdbc.datasource.SimpleDriverDataSource"
    c:driver-ref="hive-driver" c:url="${hive.url}"/>

<!-- standard JdbcTemplate declaration -->
<bean id="template" class="org.springframework.jdbc.core.JdbcTemplate"
    c:data-source-ref="hive-ds"/>

Cascading

Spring 还支持基于 Java 的、类型安全的 配置模型。您可以将其作为声明式 XML 配置的替代或补充 - 例如与 Cascading 一起使用。
@Configuration
public class CascadingConfig {
    @Value("${cascade.sec}") private String sec;
    
    @Bean public Pipe tsPipe() {
        DateParser dateParser = new DateParser(new Fields("ts"), 
                 "dd/MMM/yyyy:HH:mm:ss Z");
        return new Each("arrival rate", new Fields("time"), dateParser);
    }

    @Bean public Pipe tsCountPipe() {
        Pipe tsCountPipe = new Pipe("tsCount", tsPipe());
        tsCountPipe = new GroupBy(tsCountPipe, new Fields("ts"));
    }
}
<!-- code configuration class -->
<bean class="org.springframework.data.hadoop.cascading.CascadingConfig "/>

<bean id="cascade"
    class="org.springframework.data.hadoop.cascading.HadoopFlowFactoryBean" 
    p:configuration-ref="hadoop-configuration" p:tail-ref="tsCountPipe" />

上面的示例混合了命令式和声明式配置:前者用于创建单独的 Cascading 管道,后者用于将它们连接成一个流。

使用 Spring 的可移植服务抽象

或者使用 Spring 出色的 任务/调度支持,在特定时间提交作业。
<task:scheduler id="myScheduler" pool-size="10"/>

<task:scheduled-tasks scheduler="myScheduler">
 <!-- run once a day, at midnight -->
 <task:scheduled ref="word-count-job" method="submit" cron="0 0 * * * "/>
</task:scheduled-tasks>

上面的配置使用了一个简单的 JDK Executor 实例 - 非常适合 POC 开发。您可以轻松地在生产环境中将它(只需一行代码)替换为更全面的解决方案,例如专用的 调度器WorkManager 实现 - 这是 Spring 强大服务抽象的又一个例子。

HDFS/脚本

在与 HDFS 交互时,一个常见的任务是准备文件系统,例如清理输出目录以避免覆盖数据,或者将所有输入文件移动到同一个名称方案或文件夹下。Spring for Apache Hadoop 通过完全支持 Apache Hadoop 的 fs 命令,如 FS ShellDistCp,并将它们暴露为标准的 Java API,来解决这个问题。将其与 JVM 脚本(无论是 Groovy、JRuby 还是 Rhino/JavaScript)结合起来,可以形成一个强大的组合

<hdp:script language="groovy">
  inputPath = "/user/gutenberg/input/word/"
  outputPath = "/user/gutenberg/output/word/"

  if (fsh.test(inputPath)) {
    fsh.rmr(inputPath)
  }

  if (fsh.test(outputPath)) {
    fsh.rmr(outputPath)
  }

  fs.copyFromLocalFile("data/input.txt", inputPath)
</hdp:script>

总结

这篇博文只是触及了 Spring for Apache Hadoop 中一些可用功能的一小部分;我还没有提到 Spring Batch 集成,它为各种 Apache Hadoop 交互提供了 tasklet,或者使用 Spring Integration 进行事件触发 - 将在未来的文章中详细介绍。请让我们知道您的想法、您的需求并给我们反馈:下载代码,fork 源代码,报告问题,在论坛上发帖或给我们发推文


获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有