领先一步
VMware 提供培训和认证,助您加速进步。
了解更多关于 Spring Batch 和 Spring Integration 的用户有一些共同的关注点,我们经常被问到它们是如何协同工作的。最近 Spring Batch Admin 1.0.0.M2 发布了,它大量使用了 Spring Integration,因此是探讨一些特定用例的一个很好的载体,这正是我们打算在本文中做的。
1.0.0.M2 版本的一部分是 Spring Batch Integration 模块,最近从 Spring Batch 迁移过来,并获得了 Batch Admin 的新家。Batch-Integration 的许多交叉用例已在 Spring Batch Integration 中实现或演示。之所以选择新的家园,是因为 Batch Admin 使用了 Batch Integration 的许多功能,因此使这些项目的发布周期保持一致更有意义。
Spring Batch Admin 是来自 SpringSource 的开源项目。它旨在为开发人员提供 Web UI 和工具,用于构建自己的 UI 以与 Spring Batch 作业进行交互(例如,启动、停止、调查失败原因等)。最近的里程碑版本在计划的功能方面已相当完善,但如果您有想法或想做出贡献,请访问 论坛 和 问题跟踪器,并参与社区。
开箱即用的目标运行时是 servlet 容器的单个实例(例如 SpringSource tc Server),在该容器中,系统可以零配置或无需配置即可运行。但我们希望能够支持基本用例的自定义和扩展,包括将部署扩展到服务器集群,而 Spring Integration 正是许多扩展点的关键。
Spring Batch 和 Spring Integration 之间的界限并非总是清晰的,但有一些可以遵循的指导方针。主要是:考虑粒度,并应用常见模式。其中一些常见模式将在本文中进行描述。更多模式已在 Spring Batch Integration 和 Spring Batch Admin 中实现(并且可能是未来文章的主题)。
为批处理过程添加消息传递功能可以实现操作的自动化,以及关键关注点的分离和策略化。例如,消息可以触发作业执行,然后消息的发送可以通过多种方式公开。或者,当作业完成或失败时,可能会触发消息的发送,而这些消息的消费者可能拥有与应用程序本身无关的操作关注点。
反之亦然:消息传递也可以嵌入到作业中,但这超出了本文的范围。例如:通过通道读取或写入项目进行处理。
以下是一些使用 Spring Integration 和 Spring Batch Integration 在 Batch Admin 中实现的用例。
Spring Integration 的优点在于消息生产者和消息消费者之间的关注点分离,一个很好的具体例子就是消息触发作业执行的能力。在这种情况下,消费者是完全通用的,并且是标准 Spring Batch 的一个非常薄的包装器。JobLauncher(此处代码来自 Spring Batch Integration 的 JobLaunchingMessageHandler)
@ServiceActivator
public JobExecution launch(JobLaunchRequest request) {
Job job = request.getJob();
JobParameters jobParameters = request.getJobParameters();
return jobLauncher.run(job, jobParameters);
}
从上面的代码片段可以看出,包装器非常薄,几乎不值得一提,但它的优点在于它有一个清晰、明显的责任,并且易于单独测试。这个JobLaunchRequest对象是用于向JobLauncher发送输入参数的特殊包装器,以便它们可以成为 Spring Integration 中消息的有效负载。
的JobLaunchingMessageHandler已连接到 Spring Batch Admin 中的MessageChannel(在 Manager jar 中/META-INF/bootstrap/integration/launch-context.xml):
<service-activator input-channel="job-requests">
<beans:bean class="org.springframework.batch.integration.launch.JobLaunchingMessageHandler">
<beans:constructor-arg ref="jobLauncher" />
</beans:bean>
</service-activator>
此集成模式的消费者端已完成。这是一种本地方法,因为它不太适合远程调用,因为JobLaunchRequest故意不是Serializable(因为作业不是)。
要本地启动一个作业,我们只需要创建一个生产者并使用它向JobLaunchRequest发送一个job-requests通道。Batch Admin Manager 模块中有一个集成测试,它就是这样做的,但是这里集成方法的真正强大之处在于能够对请求进行策略化,并让它们来自各种不同的生产者。
消息可以通过多种方式发送到 Batch Admin 中的job-requests通道。要摆脱本地调用并远程公开作业,只需将传入的请求以某种其他形式适配为JobLaunchRequest,Spring Integration 使这变得非常容易。这是我们称之为通道重用的基本场景。示例是
Spring Integration HTTP 适配器模块可用于通过 HTTP 接受输入消息
<http:inbound-channel-adapter name="/job-requests" channel="job-launches"
request-mapper="bodyInboundRequestMapper" view="reload-job-executions" />
此代码片段可以在 Batch Admin Manager 模块中找到(META-INF/servlet/integration-servlet.xml)。它公开了一个端点 URLhttp://.../batch/job-requests,我们可以通过提交浏览器表单来使用它来发送作业执行请求。
原则上,请求可以是我们喜欢的任何形式,因为我们可以在此适配器下游和JobLaunchingMessageHandler上游的消息进行转换。在 Spring Batch Admin 中,转换是通过另一个 POJO 消息处理程序(StringToJobLaunchRequestAdapter)对适配器的输出进行的。
上面使用的同一个 HTTP 适配器可以从 UN*X 命令行远程启动作业。这是使用 Spring Integration HTTP 适配器的一种非常好的方式:您可以使用简单的 shell 脚本自动化许多操作。例如,如果应用程序在本地部署了一个名为“staging”的作业,那么这将起作用。
$ echo staging[input.file=foo] | curl -v -d @- -H "Content-Type: text/plain" \
https://:8080/springone-web-demo/batch/job-requests
名为“staging”的作业以一个参数启动(input.file=foo),其中foo是用于读取输入的文件的绝对路径。作业配置了一个项目读取器,如下所示
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="linesToSkip" value="1" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
</property>
<property name="resource" value="#{jobParameters[input.file]}" />
</bean>
(此代码片段不在 Spring Batch Admin 示例中,但它在我们 2009 年 Spring One Americas 的演示中。)
public JobLaunchRequest adapt(File file) throws NoSuchJobException {
JobParameters jobParameters = new JobParametersBuilder().addString(
"input.file", file.getAbsolutePath()).toJobParameters();
return new JobLaunchRequest(job, jobParameters);
}
这个简单的 POJO 方法声明为@ServiceActivator(它也可以是@Transformer), 因此它可以插入消息处理链,就在JobLaunchingMessageHandler之前,用于将File转换为JobLaunchRequest.
一个失败的作业通常可以在 Spring Batch 中重新启动,并且此功能可通过 Spring Batch Admin UI 的 Web 浏览器访问。它也可以在命令行上访问,或者对任何能够向名为job-restarts:
<channel id="job-restarts" />
<service-activator input-channel="job-restarts" output-channel="job-requests">
<beans:bean class="org.springframework.batch.admin.integration.JobNameToJobRestartRequestAdapter">
<beans:property name="jobLocator" ref="jobRegistry" />
<beans:property name="jobExplorer" ref="jobExplorer" />
</beans:bean>
</service-activator>
的 Spring Integration 通道发送消息的人都可以访问。此通道只需要作业名称,并且已公开为 HTTP 入站端点,因此从 UN*X 命令行,您可以这样做。
$ echo staging | curl -v -d @- -H "Content-Type: text/plain" \
https://:8080/springone-web-demo/batch/job-restarts
如果一个作业因可恢复错误(例如调用远程服务的超时或网络故障)而反复失败,您可能希望它自动重新启动。重试可以在作业内部的低级别使用 Spring Batch 的某些功能来处理,但重试整个作业需要对运行时进行一些操作。这可以通过 Spring Integration 轻松完成,因为现在job-requests通道正在接受启动请求。此处的端点将充当过滤器,查找作业中已知可重试的失败条件,然后作为重新启动转换器(如上面的示例)。因此,这样的链将起作用
<chain input-channel="input-files" output-channel="job-requests"
xmlns="http://www.springframework.org/schema/integration">
<filter>
<bean class="...RetryableJobExecutionFilter"
xmlns="http://www.springframework.org/schema/beans">
<property name="pattern" value="(&s).*TimeoutException.*" />
</bean>
</filter>
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter"
xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="job1" />
</bean>
</service-activator>
</chain>
其中RetryableJobExecutionFilter可以实现如下
public boolean isRetryable(JobExecution jobExecution) {
boolean retryable = false;
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (stepExecution.getStatus().isLessThan(BatchStatus.STOPPED)) {
continue;
}
if (stepExecution.getExitStatus().getExitDescription().matches(pattern)) {
retryable = true;
break;
}
}
return retryable;
}
这个例子是在我们的 Spring One 演示中,它不在 Spring Batch Admin 中,尽管对于您需要的任何特定过滤器来说,实现它都是微不足道的。
文件上传到应用程序直接通过 Spring Batch Admin UI 支持。不建议使用 HTTP POST 上传大文件,主要是因为应用程序必须在内存中缓冲内容,但这是上传小型或中型数据集以供 Spring Batch 处理的便捷功能。
实际的示例应用程序不使用 Spring Integration 的文件轮询器(但它可供想要按上述方式配置它的客户端使用);相反,它在文件上传后使用直接的消息触发器。策略是让 Manager 模块上传文件,然后向发布-订阅通道(input-files).
可以使用输入文件的任何作业都必须有一个上游组件订阅该通道,并在文件感兴趣时传递它。在示例中,这是通过按父目录名称过滤文件来完成的。
<chain input-channel="input-files" output-channel="job-requests"
xmlns="http://www.springframework.org/schema/integration">
<filter expression="payload.parent.name=='sample'" />
<service-activator>
<bean class="org.springframework.batch.admin.integration.FileToJobLaunchRequestAdapter"
xmlns="http://www.springframework.org/schema/beans">
<property name="job" ref="job1" />
</bean>
</service-activator>
</chain>
如果输入文件有一个父目录(可以在 Web UI 中设置)“sample”,那么它将被管道传输到服务激活器,该服务激活器将其转换为JobLaunchRequest并将其传递给JobLaunchingMessageHandler进行处理(如上所述)。
public interface FileSender {
void send(File file);
}
该接口没有实现(单元测试中的存根除外),因为 Spring Integration 可以提供一个。
<gateway id="fileSender"
service-interface="org.springframework.batch.admin.service.FileSender"
default-request-channel="input-files" />
<beans:bean class="org.springframework.batch.admin.service.LocalFileService">
<beans:property name="fileSender" ref="fileSender" />
</beans:bean>
Spring Batch Admin 允许用户上传 Spring 配置文件,以便从 UI 启动和管理作业。这对于在运行时重新参数化作业非常有用,例如,在运行一系列性能测试以测量各种性能调整(如更改步骤中的提交间隔)的效果时。
为了接受配置文件作为输入,我们使用消息通道,以便它可以在多种不同的输入方法之间重用。配置进入一个名为job-configurations:
<service-activator input-channel="job-configurations" output-channel="job-registrations">
<beans:bean class="org.springframework.batch.admin.integration.JobConfigurationResourceLoader">
<beans:property name="jobRegistry" ref="jobRegistry" />
</beans:bean>
</service-activator>
这里的服务激活器只接受一个 SpringResource并将其视为配置文件:加载一个ApplicationContext,扫描其中的作业组件并将其注册到提供的注册表中。一旦进入注册表,就可以从 UI 中的主作业菜单启动作业,或通过job-requests通道,如上所述。
与输入文件一样,配置文件可以通过 HTTP 传入,在这种情况下是作为文件附件或纯文本参数,也可以通过文件轮询传入。轮询用例在 Manager 模块中实现,因此值得快速查看一下它的工作原理。在META-INF/bootstrap/integration/configuration-context.xml我们发现这个
<file:inbound-channel-adapter directory="target/config" channel="job-configuration-files"
filename-pattern=".*\.xml">
<poller max-messages-per-poll="1">
<cron-trigger expression="5/1 * * * * *" />
</poller>
</file:inbound-channel-adapter>
该适配器将轮询一个目录(此处为演示目的硬编码为“target/config”,但在实际应用程序中将进行参数化)并查找文件名以“.xml”结尾的文件。当一个匹配该模式的文件到达时,它(作为java.io.File)被发送到job-configuration-files通道。从那里消息被转换,以便File变成一个Resource,并且可以发送到job-configurations通道。
一旦您开始使用 Spring Integration 消息来驱动许多应用程序功能,就经常能够利用消息流进行信息或报告目的。例如,在作业启动、停止(完成或失败)时发送消息会很有用。使用 Spring Integration 的MessagePublishingInterceptor很容易做到。在 Spring Batch Admin Manager 中,该拦截器配置为发送作业执行消息
<aop:config>
<aop:advisor advice-ref="jobMessagePublishingInterceptor" pointcut="execution(* *..Job+.execute(..))" />
</aop:config>
<bean id="jobMessagePublishingInterceptor" class="org.springframework.integration.aop.MessagePublishingInterceptor"
xmlns="http://www.springframework.org/schema/beans">
<constructor-arg index="0">
<bean class="org.springframework.batch.admin.integration.TrivialExpressionSource" p:payload="#args[execution]" />
</constructor-arg>
<property name="defaultChannel" ref="job-operator" />
</bean>
每次作业被执行时,AOP 顾问会传递参数值(一个StepExecution)被发送到job-operator通道。有兴趣的各方可以订阅该通道并获取有关最近执行的消息的信息。Spring Batch Admin 开箱即用不处理这些消息,只是在控制台记录它们,并在 UI 中列出它们以便检查。构建自己的 Spring Batch Admin 之上应用程序的客户端可能会发现这些消息对于通知操作员或报告系统关于作业结果很有用。
设置信息性消息可能会导致新的应用程序功能出现:上面描述的作业重试功能是为 Spring One 实现的,它通过挂接一个端点来监听job-operator通道。
我们希望本文能为您提供一些关于 Spring Integration 在 Batch 应用程序中使用方式的见解。上面几乎所有的代码示例都以某种形式存在于 Spring Batch Admin 中,但这绝非故事的结局,并且在 Spring Batch Integration 和 Spring Batch Admin 项目中有更多示例。访问 Batch Admin 网站以获取更多信息,并了解在哪里获取代码进行试用。InfoQ 上还有一个 视频,其中 Spring Batch 和 Spring Integration 的负责人(Dave Syer 和 Mark Fisher)在 Spring One 上介绍了本文中的一些主题。