抢占先机
VMware 提供培训和认证,助您快速提升。
了解更多Spring Batch 和 Spring Integration 的用户有一些共同的担忧,我们经常被问到它们如何协同工作。Spring Batch Admin 1.0.0.M2 最近发布了,它大量使用了 Spring Integration,因此它是研究一些特定用例的良好载体,这正是我们计划在本文中介绍的内容。
1.0.0.M2 版本发布的一部分是 Spring Batch Integration 模块,最近从 Spring Batch 迁移过来,并在 Batch Admin 中找到了新的归宿。许多 Batch-Integration 的交叉用例都在 Spring Batch Integration 中实现或演示。新归宿的原因是 Batch Admin 大量使用了 Batch Integration 的功能,因此调整这些项目的发布周期更有意义。
Spring Batch Admin 是 SpringSource 的一个开源项目。它旨在为开发人员提供一个 Web UI 和用于构建自己的 UI 的工具,以便与 Spring Batch 作业进行交互(启动、停止、调查失败原因等)。最近的里程碑版本在 1.0 的计划功能方面已经相当完整,但如果您有任何想法或贡献,请访问 论坛 和 问题追踪器 并参与社区。
开箱即用的目标运行时是单个 servlet 容器实例(例如 SpringSource tc Server),在该容器中,系统无需任何配置即可工作。但我们希望能够支持基本用例的定制和扩展,包括将部署扩展到服务器集群,而 Spring Integration 正被证明是许多扩展点的关键。
Spring Batch 和 Spring Integration 之间的界限并非总是清晰的,但有一些可以遵循的指导方针。主要是:考虑粒度,并应用常见模式。本文描述了一些常见模式。更多模式在 Spring Batch Integration 和 Spring Batch Admin 中实现(也可能是未来文章的主题)。
为批处理过程添加消息传递可以实现操作自动化,并分离和策略化关键关注点。例如,一条消息可能触发作业执行,然后消息的发送可以通过多种方式暴露。或者当作业完成或失败时,可能会触发发送一条消息,而这些消息的消费者可能具有与应用程序本身无关的操作关注点。
反过来也一样:消息传递也可以嵌入到作业中,但这超出了本文的范围。例如:通过通道读写待处理的项目。
以下是一些使用 Spring Integration 和 Spring Batch Integration 在 Batch Admin 中实现的用例。
Spring Integration 的优点在于消息生产者和消息消费者之间的关注点分离,一个很好的具体例子是消息触发作业执行的能力。在这种情况下,消费者是完全通用的,并且是围绕标准 Spring Batch 的一个非常薄的包装器JobLauncher(这里的代码来自 Spring Batch Integration 的 JobLaunchingMessageHandler)
@ServiceActivator
public JobExecution launch(JobLaunchRequest request) {
Job job = request.getJob();
JobParameters jobParameters = request.getJobParameters();
return jobLauncher.run(job, jobParameters);
}
正如您从上面的代码片段中可以看到的,这个包装器非常薄,几乎不值一提,但它的优点是具有非常清晰和明显的职责,并且易于独立测试。JobLaunchRequest对象是一个特殊的包装器,用于传递给JobLauncher的输入参数,以便它们可以构成 Spring Integration 消息的有效载荷。
这个JobLaunchingMessageHandler被连接到一个MessageChannel在 Spring Batch Admin 中(位于 Manager jar 的/META-INF/bootstrap/integration/launch-context.xml):
<service-activator input-channel="job-requests">
<beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
<beans:constructor-arg ref="jobLauncher" />
</beans:bean>
</service-activator>
至此,这种集成模式的消费者端就完成了。这是一种本地方法,因为它不太适合远程调用,因为JobLaunchRequest故意不是Serializable(因为Job不是)。
要启动一个Job,我们在本地所需要做的就是创建一个生产者并用它来发送一个JobLaunchRequest到job-requests通道。Batch Admin Manager 模块中有一个集成测试正是这样做的,但这里集成方法的真正强大之处在于能够策略化请求并让它们来自各种不同的生产者。
一条消息可以发送到job-requestsBatch Admin 中的通道,方法有很多。要打破本地调用的限制并远程暴露作业,所需要做的就是将其他形式的传入请求适配为一个JobLaunchRequest,而 Spring Integration 使这一切变得非常容易。这是我们称之为通道复用的模式的基本场景。例如:
Spring Integration HTTP 适配器模块可用于通过 HTTP 接收输入消息
<http:inbound-channel-adapter name="/job-requests" channel="job-launches"
request-mapper="bodyInboundRequestMapper" view="reload-job-executions" />
此代码片段可在 Batch Admin Manager 模块中找到(位于META-INF/servlet/integration-servlet.xml)。它暴露了一个端点 URLhttp://.../batch/job-requests,我们可以通过在浏览器中提交表单来发送作业执行请求。
原则上,请求可以是任何我们喜欢的形式,因为我们可以在此适配器的下游和JobLaunchingMessageHandler的上游转换消息。在 Spring Batch Admin 中,转换是通过另一个 POJO 消息处理程序(StringToJobLaunchRequestAdapter)对适配器输出进行的。
上面使用的同一个 HTTP 适配器可以用来从 UN*X 命令行远程启动作业。这是使用 Spring Integration HTTP 适配器的一个非常棒的方法:你可以使用简单的 shell 脚本自动化许多操作。例如,如果应用程序在本地部署,并且有一个名为 "staging" 的作业,这将起作用
$ echo staging[input.file=foo] | curl -v -d @- -H "Content-Type: text/plain" \
http://localhost:8080/springone-web-demo/batch/job-requests
名为 "staging" 的作业使用一个参数启动(input.file=foo),其中foo是要作为输入读取的文件的绝对路径。该作业配置了如下的 item reader
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="linesToSkip" value="1" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
</property>
<property name="resource" value="#{jobParameters[input.file]}" />
</bean>
(此代码片段不在 Spring Batch Admin 示例中,但在我们在 Spring One Americas 2009 上进行的演示中出现过。)
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", file.getAbsolutePath()).toJobParameters();
return new JobLaunchRequest(job, jobParameters);
}
这个简单的 POJO 方法被声明为@ServiceActivator(它也可以是@Transformer),因此它可以插入到消息处理链中,紧接在JobLaunchingMessageHandler之前,将一个File转换成一个JobLaunchRequest.
失败的作业通常可以在 Spring Batch 中重新启动,这个功能可以通过 Spring Batch Admin UI 的网页浏览器使用。它也可以在命令行中使用,或者对任何可以向名为job-restarts:
<channel id="job-restarts" />
<service-activator input-channel="job-restarts" output-channel="job-requests">
<beans:bean class="org.springframework.batch.admin.integration.JobNameToJobRestartRequestAdapter">
<beans:property name="jobLocator" ref="jobRegistry" />
<beans:property name="jobExplorer" ref="jobExplorer" />
</beans:bean>
</service-activator>
的所有通道发送消息的人使用。这个通道只需要作业名称,并且已被暴露为一个 HTTP 入站端点,所以从 UN*X 命令行你可以这样做
$ echo staging | curl -v -d @- -H "Content-Type: text/plain" \
http://localhost:8080/springone-web-demo/batch/job-restarts
如果作业因可恢复的错误(例如调用远程服务时超时或网络故障)重复失败,您可能希望它自动重新启动。重试可以在作业内部使用 Spring Batch 的一些特性在低级别处理,但要重试整个作业需要对运行时进行一些操作。这可以通过 Spring Integration 简单地实现,现在job-requests通道正在接受启动请求。为此目的的端点将充当一个过滤器,查找已知可重试的作业失败条件,然后作为一个重启转换器(如上面的示例)。因此,像这样的链将起作用
<chain input-channel="input-files" output-channel="job-requests"
xmlns="http://www.springframework.org/schema/integration">
<filter>
<bean class="...RetryableJobExecutionFilter"
xmlns="http://www.springframework.org/schema/beans">
<property name="pattern" value="(&s).*TimeoutException.*" />
</bean>
</filter>
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter"
xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="job1" />
</bean>
</service-activator>
</chain>
其中RetryableJobExecutionFilter可能这样实现
public boolean isRetryable(JobExecution jobExecution) {
boolean retryable = false;
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isLessThan(BatchStatus.STOPPED)) {
continue;
}
if (stepExecution.getExitStatus().getExitDescription().matches(pattern)) {
retryable = true;
break;
}
}
return retryable;
}
这个例子在我们的 Spring One 演示中出现过;它不在 Spring Batch Admin 中,但对于您需要的任何特定过滤器来说,实现起来都很简单。
通过 Spring Batch Admin UI 直接支持将文件上传到应用程序。不建议使用 HTTP POST 上传大文件,主要是因为应用程序必须在内存中缓冲内容,但这对于上传小型或中型数据集供 Spring Batch 处理来说是一个不错的功能。
示例应用程序实际上并没有使用 Spring Integration 的文件轮询器(但对于希望按上述方式配置它的客户端来说,它是可用的);而是使用文件上传后的直接消息触发。策略是 Manager 模块上传文件,然后向发布-订阅通道(input-files).
)发送一条消息。任何可以使用输入文件的作业,只需要有一个上游组件订阅该通道,并在文件符合条件时将其传递下去。在示例中,这是通过按文件的父目录名称过滤文件来完成的
<chain input-channel="input-files" output-channel="job-requests"
xmlns="http://www.springframework.org/schema/integration">
<filter expression="payload.parent.name=='sample'" />
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter"
xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="job1" />
</bean>
</service-activator>
</chain>
如果输入文件的父目录(可在 Web UI 中设置)是 "sample",那么它将被导入到服务激活器中,该激活器将其转换为一个JobLaunchRequest,并将其发送出去由JobLaunchingMessageHandler处理(如前所述)。
public interface FileSender {
void send(File file);
}
该接口没有实现(除了单元测试中的存根),因为 Spring Integration 可以提供一个
<gateway id="fileSender"
service-interface="org.springframework.batch.admin.service.FileSender"
default-request-channel="input-files" />
<beans:bean class="org.springframework.batch.admin.service.LocalFileService">
<beans:property name="fileSender" ref="fileSender" />
</beans:bean>
Spring Batch Admin 允许用户上传 Spring 配置文件,以便从 UI 启动和管理作业。这对于在运行时重新参数化作业非常有用,例如在运行性能测试套件以衡量各种性能调整(如更改步骤中的提交间隔)的效果时。
为了接受输入的配置文件,我们使用一个消息通道,这样它就可以被多种不同的输入方法复用。配置通过一个名为job-configurations:
<service-activator input-channel="job-configurations" output-channel="job-registrations">
<beans:bean class="org.springframework.batch.admin.integration.JobConfigurationResourceLoader">
<beans:property name="jobRegistry" ref="jobRegistry" />
</beans:bean>
</service-activator>
这里的服务激活器只接受一个 SpringResource并将其视为一个配置文件:加载一个ApplicationContext,扫描其中的Job组件,并将其注册到提供的注册表中。一旦注册到注册表中,作业就可以从 UI 的主 Jobs 菜单启动,或者通过job-requests通道启动,如上所述。
就像输入文件一样,配置文件可以通过 HTTP 传入,在这种情况下可以是文件附件或纯文本参数,也可以通过文件轮询传入。轮询用例在 Manager 模块中实现,所以值得快速看一下它是如何工作的。在META-INF/bootstrap/integration/configuration-context.xml中,我们找到了这个
<file:inbound-channel-adapter directory="target/config" channel="job-configuration-files"
filename-pattern=".*\.xml">
<poller max-messages-per-poll="1">
<cron-trigger expression="5/1 * * * * *" />
</poller>
</file:inbound-channel-adapter>
适配器将轮询一个目录(这里为演示目的硬编码为 "target/config",但在实际应用程序中会进行参数化)并查找名称以 ".xml" 结尾的文件。当符合该模式的文件到达时,它被发送(作为java.io.File)到job-configuration-files通道。消息从那里被转换,以便File变成一个Resource,并且可以将其发送到job-configurations通道。
一旦您开始使用 Spring Integration 消息来驱动许多应用程序功能,通常可以方便地接入消息流,以获取信息或用于报告目的。例如,在作业开始、停止(完成或失败)时发送一条消息会很有用。使用MessagePublishingInterceptor可以轻松实现这一点。在 Spring Batch Admin Manager 中,该拦截器被配置为发送作业执行消息
<aop:config>
<aop:advisor advice-ref="jobMessagePublishingInterceptor" pointcut="execution(* *..Job+.execute(..))" />
</aop:config>
<bean id="jobMessagePublishingInterceptor" class="org.springframework.integration.aop.MessagePublishingInterceptor"
xmlns="http://www.springframework.org/schema/beans">
<constructor-arg index="0">
<bean class="org.springframework.batch.admin.integration.TrivialExpressionSource" p:payload="#args[execution]" />
</constructor-arg>
<property name="defaultChannel" ref="job-operator" />
</bean>
每当Job被执行时,AOP 通知器会传递参数值(一个JobExecution)到)到 job-operator通道。然后,感兴趣的各方可以订阅该通道并获取有关最近执行消息的信息。Spring Batch Admin 开箱即用时不会对这些消息进行任何处理,除了在控制台记录它们,并在 UI 中列出以便检查。在 Spring Batch Admin 之上构建自己应用程序的客户端可能会发现这些消息有助于通知操作员或报告系统关于作业结果的信息。
设置信息消息可能会带来开启新应用程序功能的副作用:上面描述的作业重试功能就是通过连接一个端点来监听)到 job-operator通道。
我们希望本文能让您对 Spring Integration 在批处理应用程序中的一些用法有所了解。上面几乎所有的代码示例都以某种形式存在于 Spring Batch Admin 中,但这绝不是故事的结尾,在 Spring Batch Integration 和 Spring Batch Admin 项目中还有更多示例。访问 Batch Admin 网站获取更多信息并了解在哪里可以获得代码进行尝试。InfoQ 上还有一个关于本文部分主题的 视频,由 Spring Batch 和 Spring Integration 的负责人(Dave Syer 和 Mark Fisher)在 Spring One 大会上展示。