构建 Spring Integration 4.1 WebSocket 端点

工程 | Pieter Humphrey | 2014 年 11 月 15 日 | ...

作者:Josh Long

Spring Integration 4.1 已发布,其中包含大量出色的新功能!我最喜欢的功能之一?与 Spring 4 WebSocket 支持的智能集成。现在,您可以组合一个其最终目的地是 WebSocket 客户端的集成流。还支持充当 WebSocket 服务的客户端。

为了编译它,您需要 Java 8(我们在此大量使用 lambda 表达式)以及以下 Maven 依赖项:

  • groupIdorg.springframework.integrationartifactIdspring-integration-java-dslversion1.0.0.RC1
  • groupIdorg.springframework.integrationartifactIdspring-integration-websocketversion4.1.0.RELEASE
  • groupIdorg.springframework.bootartifactIdspring-boot-starter-websocketversion1.2.0.RC1

为了解析这些依赖项,您需要 snapshotmilestone Maven 存储库。

所有监听 /names 的客户端都将接收发送到 requestChannel 通道的消息。Spring 4 MessageChannel 是一个命名通道,更多或更少类似于 java.util.Queue<T>。此示例使用 Spring Integration Java 配置 DSL 并在新的 Spring Integration 4.1 Web Socket 支持之上。

package demo ;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.support.Function;
import org.springframework.integration.websocket.ServerWebSocketContainer;
import org.springframework.integration.websocket.outbound.WebSocketOutboundMessageHandler;
import org.springframework.messaging.*;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.*;

import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/**
 * @author Artem Bilan
 * @author Josh Long
 */
@Configuration
@ComponentScan
@EnableAutoConfiguration
@RestController
public class Application {

    public static void main(String args[]) throws Throwable {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    ServerWebSocketContainer serverWebSocketContainer() {
        return new ServerWebSocketContainer("/names").withSockJs();
    }

    @Bean
    MessageHandler webSocketOutboundAdapter() {
        return new WebSocketOutboundMessageHandler(serverWebSocketContainer());
    }

    @Bean(name = "webSocketFlow.input")
    MessageChannel requestChannel() {
        return new DirectChannel();
    }

    @Bean
    IntegrationFlow webSocketFlow() {
        return f -> {
            Function<Message , Object> splitter = m -> serverWebSocketContainer()
                    .getSessions()
                    .keySet()
                    .stream()
                    .map(s -> MessageBuilder.fromMessage(m)
                            .setHeader(SimpMessageHeaderAccessor.SESSION_ID_HEADER, s)
                            .build())
                    .collect(Collectors.toList());
            f.split( Message.class, splitter)
                    .channel(c -> c.executor(Executors.newCachedThreadPool()))
                    .handle(webSocketOutboundAdapter());
        };
    }

    @RequestMapping("/hi/{name}")
    public void send(@PathVariable String name) {
        requestChannel().send(MessageBuilder.withPayload(name).build());
    }
}

IntegrationFlow 非常简单。对于接收到的每条消息,复制它,然后通过添加一个带有 SimpMessageHeaderAccessor.SESSION_ID_HEADER 的标头将其寻址给每个正在监听的 WebSocket 会话,然后将其发送到出站 webSocketOutboundAdapter,该适配器将其传递给每个正在监听的客户端。要查看其工作原理,请在一个浏览器窗口中打开 https://:8080/,然后在另一个窗口中打开 https://:8080/hi/Spring。这里有一个简单的客户端 在此技术提示的代码存储库中进行了演示

关于如何在 Spring Integration 4.1 文档的 Web Socket 组件 中使用 Web Socket,有很棒的文档。在 Spring Integration 示例目录 中还有一个更具启发性的示例。

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有