领先一步
VMware 提供培训和认证,助您加速发展。
了解更多亲爱的 Spring 社区成员们!
Spring Integration Java DSL 1.0 GA 发布公告刚发布,我希望通过基于经典的 Cafe Demo 集成示例的逐行教程向您介绍 Spring Integration Java DSL。我们在此描述了对 Spring Boot 的支持、Spring Framework 的 Java 和注解 配置、IntegrationFlow
特性,并向 Java 8 的 Lambda 支持致敬,它是 DSL 风格的灵感来源。当然,这一切都由 Spring Integration Core 项目提供支持。
对于那些尚不感兴趣 Java 8 的读者,我们提供了一个不使用 Lambda 的类似教程:Spring Integration Java DSL (pre Java 8): Line by line tutorial。
但是,在我们开始描述 Cafe 演示应用程序之前,这里有一个更简短的示例供您入门...
@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext ctx =
SpringApplication.run(Start.class, args);
List<String> strings = Arrays.asList("foo", "bar");
System.out.println(ctx.getBean(Upcase.class).upcase(strings));
ctx.close();
}
@MessagingGateway
public interface Upcase {
@Gateway(requestChannel = "upcase.input")
Collection<String> upcase(Collection<String> strings);
}
@Bean
public IntegrationFlow upcase() {
return f -> f
.split() // 1
.<String, String>transform(String::toUpperCase) // 2
.aggregate(); // 3
}
}
我们将基础设施(注解等)的描述留给主要的 cafe 流描述。这里,我们希望您专注于最后一个 @Bean
,即 IntegrationFlow
以及将消息发送到该流的网关方法。
在 main
方法中,我们将字符串集合发送到网关并将结果打印到标准输出 (STDOUT)。该流首先将集合分割成单独的 String
(1);然后将每个字符串转换为大写 (2),最后我们将它们重新聚合回一个集合 (3)。由于这是流的末尾,框架将聚合结果返回给网关,新的有效载荷成为网关方法的返回值。
等效的 XML 配置可能是...
<int:gateway service interface="foo.Upcase"
default-request-channel="upcase.input">
<int:splitter input-channel="upcase.input" output-channel="transform"/>
<int:transformer expression="payload.toUpperCase()"
input-channel="transform"
output-channel="aggregate" />
<int:aggregator input-channle="aggregate" />
或者...
<int:gateway service interface="foo.Upcase"
default-request-channel="upcase.input">
<int:chain input-channel="upcase.input">
<int:splitter />
<int:transformer expression="payload.toUpperCase()" />
<int:aggregator />
</int:chain>
## Cafe 演示
Cafe Demo
应用程序的目的是演示如何使用企业集成模式 (EIP) 来反映真实咖啡馆中的订单配送
场景。通过此应用程序,我们处理几种饮料订单——热饮和冰饮。运行应用程序后,我们可以在标准输出 (System.out.println
) 中看到冷饮比热饮准备得更快。但是,整个订单的配送会推迟到热饮准备好之后。
为了反映领域模型,我们有几个类:Order
、OrderItem
、Drink
和 Delivery
。它们都在集成场景中被提及,但我们在此不进行分析,因为它们足够简单。
我们应用程序的源代码只放在一个类中;重要的行都用数字标注,对应于后面的注释。
@SpringBootApplication // 1
@IntegrationComponentScan // 2
public class Application {
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext ctx =
SpringApplication.run(Application.class, args);// 3
Cafe cafe = ctx.getBean(Cafe.class); // 4
for (int i = 1; i <= 100; i++) { // 5
Order order = new Order(i);
order.addItem(DrinkType.LATTE, 2, false); //hot
order.addItem(DrinkType.MOCHA, 3, true); //iced
cafe.placeOrder(order);
}
System.out.println("Hit 'Enter' to terminate"); // 6
System.in.read();
ctx.close();
}
@MessagingGateway // 7
public interface Cafe {
@Gateway(requestChannel = "orders.input") // 8
void placeOrder(Order order); // 9
}
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger(); // 10
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() { // 11
return Pollers.fixedDelay(1000).get();
}
@Bean
public IntegrationFlow orders() { // 12
return f -> f // 13
.split(Order.class, Order::getItems) // 14
.channel(c -> c.executor(Executors.newCachedThreadPool()))// 15
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16
.subFlowMapping("true", sf -> sf // 17
.channel(c -> c.queue(10)) // 18
.publishSubscribeChannel(c -> c // 19
.subscribe(s -> // 20
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21
.subscribe(sub -> sub // 22
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared cold drink #"
+ this.coldDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item) // 23
.handle(m -> System.out.println(m.getPayload())))))// 24
.subFlowMapping("false", sf -> sf // 25
.channel(c -> c.queue(10))
.publishSubscribeChannel(c -> c
.subscribe(s ->
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26
.subscribe(sub -> sub
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared hot drink #"
+ this.hotDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item)
.handle(m -> System.out.println(m.getPayload()))))))
.<OrderItem, Drink>transform(orderItem ->
new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots())) // 27
.aggregate(aggregator -> aggregator // 28
.outputProcessor(group -> // 29
new Delivery(group.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList()))) // 30
.correlationStrategy(m ->
((Drink) m.getPayload()).getOrderNumber()), null) // 31
.handle(CharacterStreamWritingMessageHandler.stdout()); // 32
}
}
逐行检查代码...
1
@SpringBootApplication
这是 Spring Boot 1.2 中的一个新元注解。它包括 @Configuration
和 @EnableAutoConfiguration
。由于我们处于 Spring Integration 应用程序中,并且 Spring Boot 为其提供了自动配置,因此会自动应用 @EnableIntegration
,以初始化 Spring Integration 基础设施,包括 Java DSL 的环境 - DslIntegrationConfigurationInitializer
,它由 /META-INF/spring.factories
中的 IntegrationConfigurationBeanFactoryPostProcessor
拾取。
2
@IntegrationComponentScan
这是 Spring Integration 中与 @ComponentScan
类似的注解,用于扫描基于接口的组件(Spring Framework 的 @ComponentScan
只查看类)。Spring Integration 支持发现使用 @MessagingGateway
注解的接口(见下文 #7)。
3
ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);
我们类的 main
方法旨在使用此类的配置启动 Spring Boot 应用程序,并通过 Spring Boot 启动一个 ApplicationContext
。此外,它还将命令行参数委托给 Spring Boot。例如,您可以指定 --debug
来查看启动自动配置报告的日志。
4
Cafe cafe = ctx.getBean(Cafe.class);
由于我们已经有一个 ApplicationContext
,我们可以开始与应用程序交互。而 Cafe
就是那个入口点——在 EIP 术语中称为网关
。网关只是接口,应用程序不与消息 API 交互;它只处理领域(见下文 #7)。
5
for (int i = 1; i <= 100; i++) {
为了演示咖啡馆的“工作”,我们初始化了 100 个订单,每个订单包含两种饮料——一杯热饮和一杯冰饮。然后将 Order
发送到 Cafe
网关。
6
System.out.println("Hit 'Enter' to terminate");
通常 Spring Integration 应用程序是异步的,因此为了避免从 main
线程过早退出,我们阻塞 main
方法,直到通过命令行进行一些最终用户交互。非守护线程会保持应用程序打开,但 System.read()
提供了一种干净地关闭应用程序的机制。
7
@MessagingGateway
此注解用于标记一个业务接口,表示它是终端应用程序和集成层之间的网关
。它类似于 Spring Integration XML 配置中的 <gateway />
组件。Spring Integration 为此接口创建一个 Proxy
,并将其作为一个 Bean 填充到应用程序上下文中。此 Proxy
的目的是将参数包装到 Message<?>
对象中,并根据提供的选项将其发送到 MessageChannel
。
8
@Gateway(requestChannel = "orders.input")
此方法级别的注解用于通过方法和目标集成流来区分业务逻辑。在此示例中,我们使用了 requestChannel
引用 orders.input
,它是我们 IntegrationFlow
输入通道的 MessageChannel
Bean 名称(见下文 #13)。
9
void placeOrder(Order order);
该接口方法是终端应用程序与集成层交互的中心点。此方法返回类型为 void
。这意味着我们的集成流是单向
的,我们只将消息发送到集成流,但不等待回复。
10
private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();
两个计数器,用于收集我们的咖啡馆如何处理饮料的信息。
11
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
默认
的 poller
bean。它类似于 Spring Integration XML 配置中的 <poller default="true">
组件。对于 inputChannel
是 PollableChannel
的端点是必需的。在这种情况下,对于咖啡馆的两个队列
——热饮和冰饮(见下文 #18)是必要的。在这里,我们使用 DSL 项目中的 Pollers
工厂,并使用其方法链式流畅 API 来构建轮询器元数据。请注意,如果端点需要特定的 poller
(而不是默认的 poller),可以直接从 IntegrationFlow
定义中使用 Pollers
。
12
@Bean
public IntegrationFlow orders() {
IntegrationFlow
bean 定义。它是 Spring Integration Java DSL 的核心组件,尽管它在运行时不发挥任何作用,只在 Bean 注册阶段发挥作用。下面的所有其他代码都在 IntegrationFlow
对象中注册 Spring Integration 组件(MessageChannel
、MessageHandler
、EventDrivenConsumer
、MessageProducer
、MessageSource
等),IntegrationFlowBeanPostProcessor
会解析该对象以处理这些组件,并在必要时将它们注册为应用程序上下文中的 Bean(某些元素,如通道,可能已经存在)。
13
return f -> f
IntegrationFlow
是一个 Consumer
函数式接口,因此我们可以最小化代码,只专注于集成场景的需求。它的 Lambda
接受 IntegrationFlowDefinition
作为参数。此类提供了一整套可以组合成链
的方法。我们将这些方法称为 EIP 方法
,因为它们提供了企业集成模式 (EI patterns) 的实现,并填充了来自 Spring Integration Core 的组件。在 Bean 注册阶段,IntegrationFlowBeanPostProcessor
会将这个内联 (Lambda) IntegrationFlow
转换为 StandardIntegrationFlow
并处理其组件。我们也可以使用 IntegrationFlows
工厂实现同样的效果(例如 IntegrationFlow.from("channelX"). ... .get()
),但我们认为 Lambda 定义更优雅。使用 Lambda 定义的 IntegrationFlow
会填充一个 DirectChannel
作为流的 inputChannel
,并在我们的示例中以名称 orders.input
(流 Bean 名称 + ".input"
) 注册到应用程序上下文中。这就是我们在 Cafe
网关中使用该名称的原因。
14
.split(Order.class, Order::getItems)
由于我们的集成流通过 orders.input
通道接收消息,因此我们已准备好消费和处理它们。场景中的第一个 EIP 方法是 .split()
。我们知道来自 orders.input
通道的消息有效载荷
是一个 Order
领域对象,因此我们可以在此处直接使用其类型并利用 Java 8 的方法引用
特性。第一个参数是我们期望的消息有效载荷
类型,第二个是对 getItems()
方法的方法引用,该方法返回 Collection<OrderItem>
。因此,这实现了分割
EI 模式,我们将集合中的每个条目作为单独的消息发送到下一个通道。在后台,.split()
方法注册了一个 MethodInvokingSplitter
MessageHandler
实现和该 MessageHandler
的 EventDrivenConsumer
,并将 orders.input
通道连接为 inputChannel
。
15
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.channel()
EIP 方法允许指定端点之间的具体 MessageChannel
,这类似于 Spring Integration XML 配置中通过 output-channel
/input-channel
属性对来实现。默认情况下,DSL 集成流定义中的端点与 DirectChannel
连接,这些通道根据 IntegrationFlow
bean 名称和流链中的索引
获取 bean 名称。在此示例中,我们使用另一个 Lambda
表达式,它从 Channels
工厂中选择一个特定的 MessageChannel
实现,并使用流畅 API 对其进行配置。当前的通道是一个 ExecutorChannel
,允许将来自 splitter
的消息分发到单独的线程
中,以便在下游流中并行处理它们。
16
.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping
我们场景中的下一个 EIP 方法是 .route()
,用于将热饮/冰饮
订单项发送到不同的咖啡馆厨房。我们再次在此使用方法引用 (isIced()
) 从接收消息中获取 routingKey
。第二个 Lambda 参数表示一个路由器映射
——类似于 Spring Integration XML 配置中 <router>
组件的 <mapping>
子元素。然而,由于我们使用的是 Java,我们可以进一步利用其 Lambda 支持!Spring Integration Java DSL 除了传统的通道映射
外,还为路由器
引入了子流
定义。每个子流根据路由执行,如果子流产生结果,则结果会传递给路由器之后流定义中的下一个元素。
17
.subFlowMapping("true", sf -> sf
指定当前路由器 mappingKey
的集成流。在此示例中,我们有两个子流 - hot
和 iced
。子流是相同的 IntegrationFlow
函数式接口,因此我们可以完全按照我们在顶层 IntegrationFlow
定义中使用 Lambda 的方式使用它。子流与其父流没有运行时依赖关系,它只是一种逻辑关系。
18
.channel(c -> c.queue(10))
我们已经知道 IntegrationFlow
的 Lambda 定义从 [FLOW_BEAN_NAME].input
DirectChannel
开始,所以可能会有人问“如果再次指定 .channel()
在这里如何工作?”。DSL 会处理这种情况,并使用 BridgeHandler
和端点连接这两个通道。在我们的示例中,我们在此使用了一个受限的 QueueChannel
来反映现实生活中咖啡馆厨房的繁忙状态。这里就是我们需要那个全局轮询器
来监听此通道的下一个端点的地方。
19
.publishSubscribeChannel(c -> c
.publishSubscribeChannel()
EIP 方法是 .channel()
的一个变体,用于 MessageChannels.publishSubscribe()
,但带有 .subscribe()
选项,我们可以在其中将子流指定为通道的订阅者。没错,子流又一次!因此,子流可以指定到任何深度。无论是否存在 .subscribe()
子流,父流中的下一个端点也是此 .publishSubscribeChannel()
的订阅者。由于我们已经处于 .route()
子流中,最后一个订阅者是一个隐式的 BridgeHandler
,它只是将消息弹出到顶层——类似于另一个隐式的 BridgeHandler
,将消息弹出到主流中的下一个 .transform()
端点。关于我们流的当前位置还有一点需要注意:上一个 EIP 方法是 .channel(c -> c.queue(10))
,这个也是用于 MessageChannel
。所以,它们也再次通过隐式的 BridgeHandler
连接起来。在实际应用中,我们可以仅通过一个用于咖啡馆厨房的 .handle()
来避免此 .publishSubscribeChannel()
,但我们在此的目标是尽可能多地涵盖 DSL 特性。这就是我们将厨房工作分配到同一 PublishSubscribeChannel
的多个子流中的原因。
20
.subscribe(s ->
.subscribe()
方法接受一个 IntegrationFlow
作为参数,可以指定为 Lambda 以将订阅者配置为子流
。我们在此使用了多个子流订阅者,以避免多行 Lambda 并涵盖一些 DSL 和 Spring Integration 功能。
21
s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))
在此,我们使用了一个简单的 .handle()
EIP 方法来阻塞当前线程一段时间,以演示咖啡馆厨房准备饮料的速度。我们在此使用了 Google Guava 的 Uninterruptibles.sleepUninterruptibly
,以避免在 Lambda 表达式中使用 try...catch
块,尽管您也可以这样做,并且您的 Lambda 将是多行的。或者您可以将该代码移到一个单独的方法中,并在此处使用方法引用
。
由于我们没有在 .publishSubscribeChannel()
上使用任何 Executor
,所有订阅者将按顺序在同一线程上执行;在我们的示例中,它是来自前一个 QueueChannel
上的 poller
的一个 TaskScheduler
线程。这就是为什么这个 sleep
会阻塞所有下游进程,并允许展示那个限制为 10 个元素的 QueueChannel
的繁忙状态
。
22
.subscribe(sub -> sub
下一个子流订阅者将在为冰饮
sleep
1 秒后执行。我们在这里使用另一个子流,因为前一个的 .handle()
对于 MessageHandler
的 Lambda 特性来说是单向
的。因此,为了推进整个流的处理,我们有多个订阅者:一些子流在完成工作后就结束了,不返回任何东西给父流。
23
.<OrderItem, String>transform(item ->
Thread.currentThread().getName()
+ " prepared cold drink #"
+ this.coldDrinkCounter.incrementAndGet()
+ " for order #" + item.getOrderNumber()
+ ": " + item)
当前订阅者子流中的 transformer
用于将 OrderItem
转换为适合下一个 .handle
使用的友好 STDOUT 消息。在这里,我们看到了泛型与 Lambda 表达式的结合使用。这是通过 GenericTransformer
函数式接口实现的。
24
.handle(m -> System.out.println(m.getPayload())))))
这里的 .handle()
仅仅是为了演示如何使用 Lambda 表达式将有效载荷
打印到 STDOUT。这是我们的饮料准备好的信号。之后,PublishSubscribeChannel
的最终(隐式)订阅者会将包含 OrderItem
的消息发送到主流程中的 .transform()
。
25
.subFlowMapping("false", sf -> sf
用于热饮
的 .subFlowMapping()
。实际上,它与前面的冰饮
子流类似,但具有特定的热饮
业务逻辑。
26
s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))
用于热饮
的 sleepUninterruptibly
。没错,我们需要更多时间来烧水!
27
.<OrderItem, Drink>transform(orderItem ->
new Drink(orderItem.getOrderNumber(),
orderItem.getDrinkType(),
orderItem.isIced(),
orderItem.getShots()))
主要的 OrderItem
到 Drink
的 transformer
,在咖啡馆厨房订阅者完成饮料准备后,当 .route()
子流返回其结果时执行。
28
.aggregate(aggregator -> aggregator
.aggregate()
EIP 方法提供了类似的选项来配置 AggregatingMessageHandler
及其端点,就像我们在使用 Spring Integration XML 配置时使用 <aggregator>
组件一样。当然,使用 Java DSL,我们可以直接在原地配置聚合器,无需任何额外的 Bean。Lambda 又一次派上用场了!从咖啡馆业务逻辑的角度来看,由于我们在开始时将原始订单分割
成了 OrderItem
s,因此我们为初始 Order
组装了 Delivery
。
29
.outputProcessor(group ->
AggregatorSpec
的 .outputProcessor()
允许我们在聚合器完成组后发出自定义结果。它类似于 <aggregator>
组件或 POJO 方法上的 @Aggregator
注解的 ref
/method
。我们在此的目标是为所有饮料
组装一个 Delivery
。
30
new Delivery(group.getMessages()
.stream()
.map(message -> (Drink) message.getPayload())
.collect(Collectors.toList())))
如您所见,我们在此使用了 Java 8 的 Stream
特性来处理 Collection
。我们迭代来自已释放的 MessageGroup
的消息,并将每条消息转换为其 Drink
有效载荷
(map
)。Stream
(.collect()
) 的结果(一个 Drink
列表)被传递给 Delivery
构造函数。包含此新 Delivery
有效载荷的 Message
被发送到我们 Cafe 场景中的下一个端点。
31
.correlationStrategy(m ->
((Drink) m.getPayload()).getOrderNumber()), null)
.correlationStrategy()
Lambda 演示了我们如何自定义聚合器的行为。当然,我们在此可以仅依赖 Spring Integration 内置的 SequenceDetails
,它在流的开始时默认从 .split()
生成并填充到每条分割后的消息中,但为了说明,此处包含了 CorrelationStrategy
的 Lambda 示例。(使用 XML,我们可以使用 correlation-expression
或自定义 CorrelationStrategy
)。此行中 .aggregate()
EIP 方法的第二个参数用于 endpointConfigurer
,以自定义选项,例如 autoStartup
、requiresReply
、adviceChain
等。我们在此使用 null
来表明我们依赖端点的默认选项。许多 EIP 方法提供了带或不带 endpointConfigurer
的重载版本,但 .aggregate()
需要一个端点参数,以避免对 AggregatorSpec
Lambda 参数进行显式转换。
32
.handle(CharacterStreamWritingMessageHandler.stdout());
这是我们流的末尾 - Delivery
已交付给客户!我们在此使用 Spring Integration Core 中开箱即用的 CharacterStreamWritingMessageHandler
将消息有效载荷
打印到 STDOUT。这展示了如何从 Java DSL 中使用 Spring Integration Core(及其模块)中现有组件。
好了,我们已经完成了基于 Spring Integration Java DSL 的 Cafe Demo 示例的描述。将其与 XML 示例 进行比较,以获取更多关于 Spring Integration 的信息。
这不是一个全面的 DSL 功能教程。我们在此没有回顾 endpointConfigurer
选项、Transformers
工厂、IntegrationComponentSpec
层次结构、NamespaceFactories
、如何指定多个 IntegrationFlow
bean 并将它们连接到一个应用程序等等。有关更多信息,请参阅参考手册。
至少,这篇逐行教程应该向您展示 Spring Integration Java DSL 的基础知识,以及它在 Spring Framework Java 和注解配置、Spring Integration 基础和 Java 8 Lambda 支持之间的无缝融合!
另请参阅 si4demo,了解 Spring Integration 的演变,包括 Java DSL,这在 2014 年的 SpringOne/2GX 大会上有所展示。(视频应很快可用)。
一如既往,我们期待您的评论和反馈(StackOverflow (spring-integration
标签)、Spring JIRA、GitHub),我们非常欢迎贡献!
附言:即使本教程完全基于 Java 8 Lambda 支持,我们也不想忽略 Java 8 之前的用户,我们将提供类似的非 Lambda 博客文章。敬请关注!