领先一步
VMware 提供培训和认证,助您加速进步。
了解更多代表 Spring Integration 团队,我很高兴地宣布 Spring Integration 的 5.1.0.RELEASE
版本已可用。
可以从 Maven Central、JCenter 以及我们的发布仓库下载
compile "org.springframework.integration:spring-integration-core:5.1.0.RELEASE"
首先,我要感谢所有社区成员对框架持续的积极贡献!
除了例行的依赖升级、错误修复和内部性能改进外,此版本还引入了一些值得注意的新功能:
为了严格的消息发布顺序,可以使用 BoundRabbitChannelAdvice
作为 MessageHandler
的 advice,以允许在同一线程绑定的 Channel
中执行所有下游 AMQP 操作。通常与 splitter 或其他会导致发送多个消息的机制一起使用。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gateway.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
如果 SourcePollingChannelAdapter
或 PollingConsumer
的 outputChannel
配置为 FluxMessageChannel
,轮询任务将不再由调度器执行,而是作为基于 trigger.nextExecutionTime()
的 Flux.generate()
以及随后的 Mono.delay()
来执行。这样,实际的源轮询会按需执行,并遵循由 FluxMessageChannel
传播的下游背压。PollerMetadata
选项保持不变,对最终配置是透明的。
fluxTransform()
IntegrationFlowDefinition
现在提供了一个新的 fluxTransform()
操作符,它接受一个 Function
用于以响应式方式转换传入消息的 Flux
。fluxTransform()
底层完全基于 Reactor 的 Flux.transform()
操作符,如果函数尚未产生回复消息,则将请求头复制到回复消息中。此调用在内部由输入和输出 FluxMessageChannel
实例包裹。
IntegrationFlow integrationFlow = f -> f
.split()
.<String, String>fluxTransform(flux -> flux
.map(Message::getPayload)
.map(String::toUpperCase))
.aggregate(a -> a
.outputProcessor(group -> group
.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(","))))
.channel(resultChannel);
此外,Java DSL 还提供了更多便捷的操作符:nullChannel()
、convert(Class<?>)
和 logAndReply()
。更多信息请参阅它们的 Javadoc。
java.util.function
接口现在是消息端点的一等公民。例如,可以直接在 @ServiceActivator
或 @Transformer
定义中使用 Function<?, ?>
和 Consumer<?>
。
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
Supplier<?>
接口可以简单地与 @InboundChannelAdapter
注解一起使用,或者在 <int:inbound-channel-adapter>
中用作 ref
。
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
Kotlin lambda 表达式现在也可以直接在消息端点定义中使用。
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
fun kotlinSupplier(): () -> String {
return { "baz" }
}
随着升级到 Micrometer 1.1
,框架现在会自动从 MeterRegistry
中移除已注册的 meter。这在我们开发动态 IntegrationFlow
并在运行时注册和移除它们时非常方便。
现在,在运行时注册的 MBean(例如通过动态 IntegrationFlow
注册的)在运行时销毁其 bean 时,会自动从 JMX 服务器注册中移除。
在运行时声明的 HTTP(和 WebFlux)入站端点(例如通过动态 IntegrationFlow
),现在会在 HandlerMapping
中注册它们的请求映射,并在其 bean 销毁时自动移除该映射。
由于 Spring Social 项目即将 终止生命周期,我们已将 spring-integration-twitter
模块迁移到 Spring Integration Extensions 下的一个独立项目,并且刚刚发布了 org.springframework.integration:spring-integration-social-twitter:1.0.0.RELEASE
,它完全基于 Spring Integration 5.1
。
更多更改,请查阅参考手册的“新增功能”章节。此外,请参阅迁移指南,了解此版本中的重大变更以及如何处理它们。
此版本是 Spring Boot 2.1 GA
的基础。
接下来,我们期待将 master
分支切换到 5.2
版本,开始着手新的功能和有价值的改进!
欢迎通过适当的沟通渠道提供任何反馈、功能想法、评论、错误报告和问题。