领先一步
VMware 提供培训和认证,助您加速进步。
了解更多亲爱的Spring社区!
最近我们发布了 Spring Integration Java DSL: 逐行教程,其中广泛使用了 Java 8 Lambda。我们收到一些反馈,认为这是 DSL 的一个很好的介绍,但对于那些无法迁移到 Java 8 或尚未熟悉 Lambdas 但希望利用它的人来说,需要一个类似的教程。
因此,为了帮助那些希望从 XML 配置迁移到 Java 和注解配置的 Spring Integration 用户,我们提供了这个 逐行教程 来演示,即使没有 Lambdas,我们也能从 Spring Integration Java DSL 的使用中获益良多。当然,大多数人会同意 Lambda 语法提供了更简洁的定义。
我们在这里分析的是同一个 Cafe Demo 示例,但使用了 Java 8 之前的配置版本。许多选项是相同的,所以我们只是将它们的描述复制/粘贴到这里,以获得完整的图景。由于这个 Spring Integration Java DSL 配置与 Java 8 Lambda 风格非常不同,所有用户都可以从中学习如何通过 Spring Integration Java DSL 提供的丰富选项来实现相同的结果。
我们的应用程序的源代码放在一个类中,这是一个 Boot 应用程序;重要行都标有相应的数字,对应下面的注释。
@SpringBootApplication // 1
@IntegrationComponentScan // 2
public class Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args); // 3
Cafe cafe = ctx.getBean(Cafe.class); // 4
for (int i = 1; i <= 100; i++) { // 5
Order order = new Order(i);
order.addItem(DrinkType.LATTE, 2, false);
order.addItem(DrinkType.MOCHA, 3, true);
cafe.placeOrder(order);
}
System.out.println("Hit 'Enter' to terminate"); // 6
System.in.read();
ctx.close();
}
@MessagingGateway // 7
public interface Cafe {
@Gateway(requestChannel = "orders.input") // 8
void placeOrder(Order order); // 9
}
private final AtomicInteger hotDrinkCounter = new AtomicInteger();
private final AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10
@Autowired
private CafeAggregator cafeAggregator; // 11
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() { // 12
return Pollers.fixedDelay(1000).get();
}
@Bean
@SuppressWarnings("unchecked")
public IntegrationFlow orders() { // 13
return IntegrationFlows.from("orders.input") // 14
.split("payload.items", (Consumer) null) // 15
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))// 16
.route("payload.iced", // 17
new Consumer<RouterSpec<ExpressionEvaluatingRouter>>() { // 18
@Override
public void accept(RouterSpec<ExpressionEvaluatingRouter> spec) {
spec.channelMapping("true", "iced")
.channelMapping("false", "hot"); // 19
}
})
.get(); // 20
}
@Bean
public IntegrationFlow icedFlow() { // 21
return IntegrationFlows.from(MessageChannels.queue("iced", 10)) // 22
.handle(new GenericHandler<OrderItem>() { // 23
@Override
public Object handle(OrderItem payload, Map<String, Object> headers) {
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()
+ " prepared cold drink #" + coldDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload; // 24
}
})
.channel("output") // 25
.get();
}
@Bean
public IntegrationFlow hotFlow() { // 26
return IntegrationFlows.from(MessageChannels.queue("hot", 10))
.handle(new GenericHandler<OrderItem>() {
@Override
public Object handle(OrderItem payload, Map<String, Object> headers) {
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); // 27
System.out.println(Thread.currentThread().getName()
+ " prepared hot drink #" + hotDrinkCounter.incrementAndGet()
+ " for order #" + payload.getOrderNumber() + ": " + payload);
return payload;
}
})
.channel("output")
.get();
}
@Bean
public IntegrationFlow resultFlow() { // 28
return IntegrationFlows.from("output") // 29
.transform(new GenericTransformer<OrderItem, Drink>() { // 30
@Override
public Drink transform(OrderItem orderItem) {
return new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots()); // 31
}
})
.aggregate(new Consumer<AggregatorSpec>() { // 32
@Override
public void accept(AggregatorSpec aggregatorSpec) {
aggregatorSpec.processor(cafeAggregator, null); // 33
}
}, null)
.handle(CharacterStreamWritingMessageHandler.stdout()) // 34
.get();
}
@Component
public static class CafeAggregator { // 35
@Aggregator // 36
public Delivery output(List<Drink> drinks) {
return new Delivery(drinks);
}
@CorrelationStrategy // 37
public Integer correlation(Drink drink) {
return drink.getOrderNumber();
}
}
}
逐行检查代码...
1. ````java @SpringBootApplication ```` 这是 Spring Boot 1.2 中的一个新元注解。它包含 `@Configuration` 和 `@EnableAutoConfiguration`。由于我们处于 Spring Integration 应用程序中,并且 Spring Boot 为其提供了自动配置,因此 `@EnableIntegration` 会被自动应用,以初始化 Spring Integration 基础设施,包括一个用于 Java DSL 的环境 - `DslIntegrationConfigurationInitializer`,它由 `/META-INF/spring.factories` 中的 `IntegrationConfigurationBeanFactoryPostProcessor` 拾取。 2. ````java @IntegrationComponentScan ```` 这是 Spring Integration 中 `@ComponentScan` 的类似物,用于扫描基于接口的组件(Spring Framework 的 `@ComponentScan` 只查看类)。Spring Integration 支持发现用 `@MessagingGateway` 注解的接口(见下面的 #7)。 3. ````java ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args); ```` 我们类的 `main` 方法旨在启动 Spring Boot 应用程序,使用此类的配置,并通过 Spring Boot 启动一个 `ApplicationContext`。此外,它还将命令行参数委托给 Spring Boot。例如,您可以指定 `--debug` 来查看有关启动自动配置报告的日志。 4. ````java Cafe cafe = ctx.getBean(Cafe.class); ```` 由于我们已经有了一个 `ApplicationContext`,我们可以开始与应用程序交互。`Cafe` 是入口点 - 在 EIP 术语中是一个 `gateway`。网关只是接口,应用程序不直接与消息传递 API 交互;它只处理域(见下面的 #7)。 5. ````java for (int i = 1; i <= 100; i++) { ```` 为了演示咖啡馆的“工作”,我们发起了 100 个订单,包含两种饮品 - 一种热饮和一种冷饮。并将 `Order` 发送给 `Cafe` 网关。 6. ````java System.out.println("Hit 'Enter' to terminate"); ```` 通常 Spring Integration 应用程序是异步的,因此为了避免从 `main` 线程过早退出,我们在用户通过命令行进行某些交互之前,会阻塞 `main` 方法。非守护线程会保持应用程序打开,但 `System.read()` 为我们提供了一个干净关闭应用程序的机制。 7. ````java @MessagingGateway ```` 这个注解用于标记一个业务接口,表明它是应用程序与集成层之间的 `gateway`。它是 Spring Integration XML 配置中 ``.split()` EIP 方法的第二个参数是 `endpointConfigurer`,用于自定义 `autoStartup`、`requiresReply`、`adviceChain` 等选项。我们在这里使用 `null` 来表明我们依赖于端点的默认选项。许多 EIP 方法都提供了带和不带 `endpointConfigurer` 的重载版本。目前,没有提供不带 `endpointConfigurer` 参数的 `.split(String expression)` EIP 方法;这将在未来的版本中解决。
在我们的示例中,我们这里使用了一个受限的 QueueChannel 来反映现实生活中咖啡馆厨房繁忙的状态。而在这里,我们需要为监听此通道的下一个端点设置 全局 poller。
好了,我们已经描述了基于 Spring Integration Java DSL 的 Cafe Demo 示例,其中不包含 Java Lambda 支持。将其与 XML 示例 进行比较,并查阅 Lambda 支持教程 以获取更多关于 Spring Integration 的信息。
正如您所见,在没有 Lambda 的情况下使用 DSL 会稍微冗长一些,因为您需要为函数式接口的内联匿名实现提供样板代码。然而,我们相信支持 DSL 对于那些还无法迁移到 Java 8 的用户来说很重要。DSL 的许多优势(流畅的 API、编译时验证等)对所有用户都是可用的。
Lambda 的使用延续了 Spring Framework 减少或消除样板代码的传统,因此我们鼓励用户尝试 Java 8 和 Lambda,并鼓励他们的组织考虑允许在 Spring Integration 应用程序中使用 Java 8。
此外,请参阅 参考手册 以获取更多信息。
一如既往,我们期待您的评论和反馈(StackOverflow (spring-integration 标签)、Spring JIRA、GitHub),我们非常欢迎贡献!
感谢您的时间和耐心阅读本文!