将 Cloud Foundry Worker 与 Spring 结合使用

工程 | Josh Long | 2012 年 5 月 9 日 | ...

你无疑已经阅读了 Jennifer Hickey 的精彩博文,其中介绍了 Cloud Foundry Worker,它们在设置 Ruby Resque 后台作业中的应用,以及今天介绍 Spring 支持的文章

Spring 开发者的关键要点

  1. 你需要使用 gem update vmc 命令更新你的 vmc 版本。
  2. Cloud Foundry Worker 允许你运行 public static void main 作业。也就是说,Cloud Foundry Worker 本质上是一个进程,其层次低于 Web 应用,这自然地适用于许多所谓的后台作业。
  3. 你需要提供 Cloud Foundry 将要运行的命令。你可以提供你希望使用的 java 调用方式,但更简单的方法是提供一个 shell 脚本,让 Cloud Foundry 运行该 shell 脚本。你提供的命令应该使用 $JAVA_OPTS,这是 Cloud Foundry 已经提供的,以确保内存使用和 JVM 设置的一致性。
  4. 有多种方法可以自动化创建 Cloud Foundry 可部署应用程序的过程。如果你使用 Maven,那么 org.codehaus.mojo:appassembler-maven-plugin 插件将帮助你创建一个启动脚本并打包你的 .jar 文件以便于部署,同时也可以指定一个入口点类。
  5. 其他一切基本相同。当你对一个 Java .jar 项目执行 vmc push 时,Cloud Foundry 会问你这个应用是否是一个独立应用。确认后,它会引导你完成后续的设置。

所以,让我们来看一些使用 Cloud Foundry Worker 更容易、更自然的常见架构和部署方式。我们将从 Spring 框架及其两个相关项目(Spring IntegrationSpring Batch)的角度来看这些模式,这两个项目在 Web 应用内部和外部都能发挥作用。正如我们将看到的,这两个框架都支持解耦和改进的可组合性。我们将把做什么何时做解耦,并将做什么在哪里做解耦,这一切都是为了释放前端的能力。

我有一个时间表要遵守!

我经常收到的一个常见问题是:如何在 Cloud Foundry 上进行作业调度? Cloud Foundry 支持 Spring 应用,而 Spring 当然一直支持企业级的调度抽象,如 Quartz 和 Spring 3.0 的 @Scheduled 注解。@Scheduled 非常棒,因为它极其容易添加到现有应用中。在最简单的情况下,你只需将 @EnableScheduling 添加到你的 Java 配置中,或者将 <task:annotation-driven/> 添加到你的 XML 中,然后在代码中使用 @Scheduled 注解。这在企业应用中是一个非常自然的操作 - 也许你有一个需要运行的分析或报告流程?一些长时间运行的批处理流程?我整理了一个示例,演示了如何使用 @Scheduled 运行一个 Spring Batch Job。Spring Batch 作业本身是一个 Worker 线程,它与一个 SLA 很差的 Web 服务协同工作,该服务不适合实时使用。在 Spring Batch 中处理这项工作更安全、更清晰,因为它具有恢复和重试功能,可以弥补任何网络中断、网络延迟等问题。我将让你参考代码示例的大部分细节,我们只看入口点,然后看看如何将应用部署到 Cloud Foundry。

					    
// set to every 10s for testing.
@Scheduled(fixedRate = 10 * 1000)
public void runNightlyStockPriceRecorder() throws Throwable {
	JobParameters params = new JobParametersBuilder()
		.addDate("date", new Date())
		.toJobParameters();
	JobExecution jobExecution = jobLauncher.run(job, params);
	BatchStatus batchStatus = jobExecution.getStatus();
	while (batchStatus.isRunning()) {
		logger.info("Still running...");
		Thread.sleep(1000);
	}
	logger.info(String.format("Exit status: %s", jobExecution.getExitStatus().getExitCode()));
	JobInstance jobInstance = jobExecution.getJobInstance();
	logger.info(String.format("job instance Id: %d", jobInstance.getId()));
}

这个方法将按照 @Scheduled 注解规定的频率调用:在本例中是每 10 秒。在使用 JobLauncher 启动 Job 后,应用程序会阻塞(这可以避免,但在本例中是可以的,因为我们期望它阻塞)。根据数据和批处理流程的类型,它可能阻塞几分钟,...或者一周!Spring Batch 是一个健壮的批处理引擎,它被设计用于安全地处理大型数据集。批处理流程通常不属于与 HTTP 请求相同的管道 - 它是确定的且长时间运行的,因此能够在 Cloud Foundry 中使用长时间运行的 Worker 在这里是一个真正的优势。

让我们将应用部署到 Cloud Foundry。示例应用是 Maven 项目。要构建它们,请进入代码库的根目录,并使用 Maven 构建工具运行 mvn clean install

从这里开始部署应用非常容易,前提是你已经设置了 vmc 命令行工具并更新到了最新版本,正如 Jennifer 在第一篇文章中所解释的。应用代码根目录下附带一个 manifest.yml 文件。Cloud Foundry 的 manifest.yml 文件完整描述了应用期望由平台即服务提供的所有东西,包括要使用的数据库、分配多少内存等等。Cloud Foundry 在部署应用时会读取这个 manifest 并满足其需求。因此,仅凭部署这个应用,你也将获得该应用存储示例数据所需的 PostgreSQL 数据库实例,以及 Spring Batch 用于维护运行时状态的表。


jlongmbp17:cf-workers-batch jlong$ vmc --path target/appassembler/ push 
Pushing application 'batch'...
Creating Application: OK
Creating Service [stock_batch]: OK
Binding Service [stock_batch]: OK
Uploading Application:
  Checking for available resources: OK
  Processing resources: OK
  Packing application: OK
  Uploading (55K): OK   
Push Status: OK
Staging Application 'batch': OK                                                 
Starting Application 'batch': OK      
                                          
jlongmbp17:cf-workers-batch jlong$ vmc apps
+---------------------+----+---------+----------------------------------+----------------+
| Application         | #  | Health  | URLS                             | Services       |
+---------------------+----+---------+----------------------------------+----------------+
| batch               | 1  | RUNNING |                                  | stock_batch    |
...

调度的任务每十秒运行一次,所以等待十秒后再进行下一步。接下来,我们将登录数据库,查看我们的微小进程做了什么。Spring Batch 作业获取 STOCKS 表中的所有股票代码,然后查找它们的当前定价信息,并将该快照数据插入到 STOCKS_DATA 表中。vmc tunnel 命令在云管理的数据库中创建了一个直接隧道。我用它连接到我们的 PostgreSQL 实例,并在 STOCKS_DATA 表上运行一个查询 (SELECT * FROM STOCKS_DATA)。


jlongmbp17:cf-workers-batch jlong$ vmc tunnel stock_batch
Binding Service [stock_batch]: OK
Stopping Application 'caldecott': OK
Staging Application 'caldecott': OK                                             
Starting Application 'caldecott': OK                                            
Getting tunnel connection info: OK

Service connection info: 
  username : u59993cf15bc0461a1d2648f7eab27f2da15f
  password : p1be9035f3c764817809ac81f3267
  name     : d5c5a80838d7bcd79dc7eefa6c1b04d22

Starting tunnel to stock_batch on port 10002.
1: none
2: psql
Which client would you like to start?: 2
Launching 'psql -h localhost -p 10002 -d d5c5aefa6c1b04d2280838d7bcd79dc7e -U u59993cf15bc04817809ac861a1d2648f -w'

psql (9.0.5, server 9.0.4)
Type "help" for help.

d5c5aefa6c1b04d2280838d7bcd79dc7e=> select * from stocks_data;
 id | date_analysed | high_price | low_price | closing_price | symbol 
----+---------------+------------+-----------+---------------+--------
  1 | 2012-05-08    |        613 |     602.3 |         602.3 | GOOG
  2 | 2012-05-08    |      30.78 |     30.25 |         30.25 | MSFT
  3 | 2012-05-08    |      27.87 |     27.56 |         27.56 | ORCL
  4 | 2012-05-08    |      32.41 |     32.06 |         32.06 | ADBE
  5 | 2012-05-08    |     107.38 |       103 |           103 | VMW
...

它成功了!在我们继续之前,现在是关闭应用程序的绝佳时机。每隔十秒查找一次这些股票,每次都插入新记录,这对你没有任何好处!几小时后,数据集很可能会变得不堪重负。我建议你改变频率(参见注释中可用于 @Scheduled 注解的 cron 表达式,它可以在工作日晚上 11 点运行),然后执行 vmc --path target/appassembler/ update,或者就让它保持停止状态。

jlongmbp17:cf-workers-batch jlong$ vmc stop batch

所以,批处理和作业调度是 Spring、Spring Batch 和 Cloud Foundry 都能轻松处理的两个非常常见的用例。另一个(不同但同样常见)的用例是为便于扩展而进行架构设计。毕竟,当你的下一个大型应用遇到奥普拉效应/.'d 时,扩展究竟意味着什么?现在,让我们来看看这个问题...

嘿!到队列后面去!

这有点反直觉,但一个由许多小型、专注组件组成的系统比一个庞大的整体应用更容易扩展,因为个体工作负载可以独立于应用的其他部分进行扩展和调整。假设你有一个资源匮乏的前端 Web 应用,它不应该被较慢的、可能存在 I/O 瓶颈的功能拖累。将主线程与较慢的后台进程解耦的一个简单方法是引入消息队列,比如 RabbitMQ。在消息传递中,我们谈论激进消费者模式,其中工作被入队,并且 Worker 会尽可能快地完成并出队这些工作。如果 Worker 的处理速度跟不上新工作的可用性,那么添加新的 Worker 来弥补差距就非常容易:只需执行 vmc instances my-backend +10!每个单独请求可能仍然需要固定时间,但系统的整体吞吐量会增加。

对于我们的下一个示例——它与上一个示例几乎无关,只是另一种在 Cloud Foundry 上使用独立 Worker 进程的方式——我们将看看如何构建一个面向消息的服务。我们将使用 RabbitMQ,这是一个世界级的消息队列,可在 Cloud Foundry 上作为服务使用。消息系统就是众所周知的邮箱。它们接受消息,并分发消息。通过将你的 API 简化为像 RabbitMQ 这样的消息代理(它本身基于 AMQP 协议),你为你的应用程序提供了尽可能友好的接口。消息系统本质上是异步的,因此它们不强制要求请求/响应交换,尽管也支持这种模式。如果你想利用 RabbitMQ 与其他 Worker 集成,你仍然可以向你的服务消费者暴露一个 门面网关,这样他们就不需要知道服务的实现方式,如果你不想让他们知道的话。我们将使用 Spring Integration(它支持企业集成模式)从 Java 接口类型构建一个消息网关

```java

public interface StockSymbolLookupClient { StockSymbolLookup lookupSymbol (String symbol) throws Throwable; }

<P>With Spring Integration's help, calls to this method will result in a message being sent to RabbitMQ where, on the other side, our service will dequeue the message, process it, and then, eventually send a reply back to RabbitMQ, which the caller of this method will receive  as the reply value.  </P>  <P> To configure the client, I used a little bit of Spring Integration to setup the gateway, which then forwards the request to the outbound AMQP gateway adapter.</P>

```xml

<?xml version="1.0" encoding="UTF-8"?>
	<beans:beans ...>
	    ...
	    <gateway
	            service-interface="org.cloudfoundry.workers.stocks.integration.client.StockClientGateway"
	            default-request-channel="outboundSymbolsRequests"
	            default-reply-channel="outboundSymbolsReplies"
	    />

	    <amqp:outbound-gateway
	            request-channel="outboundSymbolsRequests"
	            reply-channel="outboundSymbolsReplies"
	            routing-key="tickers"
	            amqp-template="amqpTemplate"
	    />
	</beans:beans>

我省略了 RabbitMQ 特有的连接机制的细节(当然,这一切都使用 cloudfoundry-runtime API 完成)——请参考客户端示例。这段代码显示了重要的部分:Spring 将在运行时基于 StockClientGateway interface 合成一个实现,我们可以注入并从客户端代码中使用它来调用服务。

在服务侧,我们需要一些代码来从 RabbitMQ 中取出消息,然后将它们转发给主力工作者,再将结果通过 RabbitMQ 传回作为回复。当然,服务侧我们可以独立于客户端侧进行扩展。你的每个 Web 应用可能只有一个客户端,但可能有十个服务实现正在运行,弥补不足并吸收额外的需求。这是我们的服务实现的样子,同样省略了 RabbitMQ 特有的连接 bean(请查阅示例


<?xml version="1.0" encoding="UTF-8"?>
	<beans:beans ...>
	    ...
	    <amqp:inbound-gateway request-channel="inboundSymbolsRequests"
		    queue-names="tickers"
		    message-converter="mc"
		    connection-factory="connectionFactory"/>

	    <service-activator ref="client" input-channel="inboundSymbolsRequests" requires-reply="true"/>

	</beans:beans>

这种方法非常强大。在我的示例中,我只是推迟了调用一个 RESTful Web 服务的微不足道的成本。但你可以想象以这种方式处理更耗时的任务——图像处理、批处理作业、大型分析等。

部署这个例子稍微复杂一些,因为它有两个部分。网关实现是一个简单的 bean,Spring Integration 将其作为消息系统的一种客户端代理提供。我们可以在另一个后台作业、Web 应用或其他任何地方使用该网关。在我们的例子中,我们的客户端——一个 Spring MVC 应用——使用该网关来调用服务。服务提供了有趣的功能,它接收来自已配置的 RabbitMQ 实例的请求,调用股票代码查找 bean,然后通过 RabbitMQ 将结果返回给请求者。我们需要先部署服务。像之前一样,如果你还没有构建整个代码库,请在根目录下运行 mvn clean install。然后,从 cf-workers-integration-service 文件夹的根目录运行以下命令


jlongmbp17:cf-workers-integration-service jlong$ vmc --path target/appassembler/ push 
Pushing application 'integration-service'...
Creating Application: OK
Creating Service [stock_rabbitmq]: OK
Binding Service [stock_rabbitmq]: OK
Uploading Application:
  Checking for available resources: OK
  Processing resources: OK
  Packing application: OK
  Uploading (44K): OK   
Push Status: OK
Staging Application 'integration-service': OK                                   
Starting Application 'integration-service': OK                                  

服务部署好后,我们从 Web 客户端调用一次来测试一下。返回到 cf-workers-integration-webclient 目录。然后,运行与服务相同的命令。

jlongmbp17:cf-workers-integration-webclient jlong$ vmc --path target/cf-workers-integration-webclient-1.0-SNAPSHOT push
Pushing application 'integration-webclient'...
Creating Application: OK
Binding Service [stock_rabbitmq]: OK
Uploading Application:
  Checking for available resources: OK
  Processing resources: OK
  Packing application: OK
  Uploading (3K): OK   
Push Status: OK
Staging Application 'integration-webclient': OK                                 
Starting Application 'integration-webclient': OK                                                         

应用程序部署后(你可以通过运行 vmc apps 确认两者都在正常运行),现在打开客户端并尝试我们的服务:打开 Web 客户端应用程序的 URL。在我的例子中,URL 是 integration-webclient.cloudfoundry.com。它应该会显示一个普通的表单。输入一个股票代码(我测试时用的是 VMW...),然后按 Enter。你应该会在页面上看到数据显示出来。

如果你查看 integration-service 的日志,应该会看到反映与你在客户端看到的相同信息的控制台输出。如果看到了,恭喜你!

下一步怎么办

至此,你已掌握了许多强大的能力。Cloud Foundry Worker 是处理那些不自然地适合 HTTP 请求处理管道的工作的理想场所。根据我的经验,这包括很多事情:批处理、集成和消息代码、分析、报告、大数据和补偿事务,以及通过 HTTP 以外的其他协议公开的系统。

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

领先一步

VMware 提供培训和认证,助你加速进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部