取得领先
VMware 提供培训和认证,以加速您的进步。
了解更多作者:Josh Long
Spring Integration 4.1 刚刚发布,它包含很多很棒的新功能!我最喜欢的功能之一?与 Spring 4 WebSocket 支持的智能集成。现在您可以组合一个集成流,其最终目的地是 WebSocket 客户端。还支持充当 WebSocket 服务的客户端。
为了编译它,您需要 Java 8(我们在这里大量使用 lambda)和以下 Maven 依赖项
org.springframework.integration
, artifactId:spring-integration-java-dsl
, version: 1.0.0.RC1
。org.springframework.integration
, artifactId:spring-integration-websocket
, version: 4.1.0.RELEASE
。org.springframework.boot
, artifactId:spring-boot-starter-websocket
, version: 1.2.0.RC1
。为了解决这些依赖关系,您需要 snapshot
和 milestone
Maven 存储库。
所有监听 /names
的客户端都将收到发送到 requestChannel
通道的任何消息。Spring 4 MessageChannel
是一个命名的管道——或多或少类似于 java.util.Queue<T>
。此示例在新的 Spring Integration 4.1 web socket 支持之上使用 Spring Integration Java 配置 DSL。这是示例
package demo ;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.messaging.*;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* @author Artem Bilan
* @author Josh Long
*/
@Configuration
@ComponentScan
@EnableAutoConfiguration
@RestController
public class Application {
public static void main(String args[]) throws Throwable {
SpringApplication.run(Application.class, args);
}
@Bean
ServerWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/names").withSockJs();
}
@Bean
MessageHandler webSocketOutboundAdapter() {
return new WebSocketOutboundMessageHandler(serverWebSocketContainer());
}
@Bean(name = "webSocketFlow.input")
MessageChannel requestChannel() {
return new DirectChannel();
}
@Bean
IntegrationFlow webSocketFlow() {
return f -> {
Function<Message , Object> splitter = m -> serverWebSocketContainer()
.getSessions()
.keySet()
.stream()
.map(s -> MessageBuilder.fromMessage(m)
.setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, s)
.build())
.collect(Collectors.toList());
f.split( Message.class, splitter)
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.handle(webSocketOutboundAdapter());
};
}
@RequestMapping("/hi/{name}")
public void send(@PathVariable String name) {
requestChannel().send(MessageBuilder.withPayload(name).build());
}
}
IntegrationFlow
很简单。对于每个传入的消息,复制它并通过添加一个包含 SimpMessageHeaderAccessor.SESSION_ID_HEADER
的标头来将其寻址到每个侦听的 WebSocket
会话,然后将其发送到出站 webSocketOutboundAdapter
,这将把它传递给每个侦听的客户端。要查看它的工作原理,在一个浏览器窗口中打开 http://localhost:8080/
,然后在另一个浏览器窗口中打开 http://localhost:8080/hi/Spring
。有一个简单的客户端在这个技巧的代码仓库中演示。
在 Spring Integration 4.1 文档中有很多关于如何使用 web socket 组件的文档。在Spring Integration 示例目录中也有一个更鼓舞人心的例子。