Spring Framework 4.0 M2:WebSocket 消息架构

工程技术 | Rossen Stoyanchev | 2013年7月24日 | ...

正如我之前所写,WebSocket API 只是 WebSocket 风格消息应用的起点。许多实际挑战仍然存在。正如一位 Tomcat 邮件列表用户最近沉思的

在我看来,websockets 似乎还不是真正“生产就绪”的(我说的不是 Tomcat 的具体实现,而是更普遍的情况)…… IE 中的原生 websockets 功能仅在 IE 10 后可用,而允许在较低版本 IE 中工作的解决方案有点“不确定”(例如依赖通过 Adobe FlashPlayer 的绕道)。 (我们的大多数客户都是大型企业,他们不会仅仅为了我们而更新浏览器,也不会在防火墙中打开特殊端口)。

Spring Framework 4.0 的第一个里程碑提供了对 SockJS 的服务器端支持,这是最佳、最全面的 WebSocket 浏览器回退方案。在不支持 WebSocket 的浏览器中以及在网络代理阻止其使用的情况下,您需要回退选项。简而言之,SockJS 使您能够今天构建 WebSocket 应用程序,并在必要时依赖透明的回退选项。

即使有了回退选项,更大的挑战仍然存在。套接字是一种非常低级的抽象,当今绝大多数 Web 应用程序都不会直接编程到套接字。这就是为什么 WebSocket 协议定义了一种子协议机制,该机制本质上启用并鼓励在 WebSocket 之上使用更高级别的协议,就像我们在 TCP 之上使用 HTTP 一样。

Spring Framework 4.0 的第二个里程碑使得在 WebSocket 之上使用更高级别的消息协议成为可能。为了演示这一点,我们编写了一个示例应用程序。

股票投资组合示例 股票投资组合示例应用程序可在 Github 上获取,它加载用户的投资组合头寸,允许买卖股票,接收报价,并显示头寸更新。这是一个相当简单的应用程序。然而,它处理了许多可能在基于浏览器的消息应用程序中出现的常见任务。

Snapshot of Stock Portfolio Application

那么我们如何构建这样一个应用程序呢?在使用 HTTP 和 REST 时,我们习惯于依赖 URL 以及 HTTP 动词来表达需要完成的操作。在这里,我们有一个套接字和大量的消息。您如何判断一条消息是给谁的,以及这条消息意味着什么?

Browser and Server exchange messages but what's in the message?

在表达这些语义之前,浏览器和服务器必须就共同的消息格式达成一致。有几种协议可以提供帮助。由于其简单性和广泛的支持,我们为这个里程碑选择了 STOMP

简单/流式面向文本消息协议 (STOMP)

STOMP 是一个以简单为设计理念的消息协议。它基于模仿 HTTP 的帧。一个帧由一个命令、可选的头和可选的正文组成。

例如,股票投资组合应用程序需要接收股票报价,因此客户端发送一个 SUBSCRIBE 帧,其中 destination 头表示客户端希望订阅的内容

SUBSCRIBE  
id:sub-1  
destination:/topic/price.stock.*  

当有股票报价可用时,服务器发送一个 MESSAGE 帧,其中包含匹配的目的地和订阅 id,以及 content-type 头和正文

MESSAGE  
subscription:sub-1  
message-id:wm2si1tj-4  
content-type: application/json  
destination:/topic/stocks.PRICE.STOCK.NASDAQ.EMC  

{\"ticker\":\"EMC\",\"price\":24.19}  

为了在浏览器中完成所有这些操作,我们使用了 stomp.jsSockJS 客户端


var socket = new SockJS('/spring-websocket-portfolio/portfolio');
var client = Stomp.over(socket);

var onConnect = function() {
  client.subscribe("/topic/price.stock.*", function(message) {
      // process quote
  });
};
client.connect('guest', 'guest', onConnect);

这已经是一个巨大的进步了!!我们有标准的消息格式和客户端支持。

现在我们可以转向服务器端了。

消息代理解决方案 一种服务器端选项是纯粹的消息代理解决方案,其中消息直接发送到传统的如 RabbitMQ、ActiveMQ 等消息代理。大多数(如果不是全部)代理都支持基于 TCP 的 STOMP,但它们也越来越多地支持基于 WebSocket 的 STOMP,而 RabbitMQ 更进一步,还支持 SockJS。我们的架构将如下所示

Browser sends STOMP messages to broker, application connects to broker via AMQP or JMS

这是一个健壮且可扩展的解决方案,但可以说并不完全适合当前的问题。消息代理通常在企业内部使用。将其直接暴露在 Web 上并不理想。

如果我们从 REST 中学到了什么,那就是我们不希望暴露关于系统内部的细节,比如数据库或领域模型。

此外,作为一名 Java 开发人员,您希望应用安全性、验证并添加应用程序逻辑。在消息代理解决方案中,应用服务器位于消息代理之后,这与大多数 Web 应用程序开发人员所习惯的方式有很大不同。

这就是为什么像 socket.io 这样的库很受欢迎。它很简单,并且针对 Web 应用程序的需求。另一方面,我们绝不能忽视消息代理处理消息的能力,它们在这方面非常出色,而且消息传递是一个难题。我们需要两全其美。

应用程序和消息代理解决方案 另一种方法是让应用程序处理传入消息,并充当 Web 客户端和消息代理之间的中介。客户端的消息可以通过应用程序流向代理,反之,来自代理的消息也可以通过应用程序流回客户端。这使得应用程序有机会检查传入的消息类型和“destination”头,并决定是处理该消息还是将其传递给代理。

Browser sends messages to application that in turn sends messages to a message broker

这是我们选择的方法。为了更好地说明,下面是一些场景。

加载投资组合头寸

  • 客户端请求投资组合头寸
  • 应用程序通过加载数据并将其返回给订阅来处理请求
  • 消息代理不参与此交互

订阅股票报价

  • 客户端发送股票报价订阅请求
  • 应用程序将消息传递给消息代理
  • 消息代理将消息传播给所有已订阅的客户端

接收股票报价

  • QuoteService 将股票报价消息发送到消息代理
  • 消息代理将消息传播给所有已订阅的客户端

执行交易

  • 客户端发送交易请求
  • 应用程序处理它,通过 TradeService 提交交易以执行
  • 消息代理不参与此交互

接收头寸更新

  • 交易服务将头寸更新消息发送到消息代理上的一个队列
  • 消息代理将头寸更新发送给客户端
  • 向特定用户发送消息将在下文更详细地介绍

严格来说,消息代理的使用是可选的。我们提供了一个开箱即用的“简单”替代方案供初学者使用。然而,为了可扩展性和在多个应用服务器上的部署,建议使用消息代理。

代码片段 让我们看一些客户端和服务器端代码的例子。

这是 portfolio.js 请求投资组合头寸的代码

stompClient.subscribe("/app/positions", function(message) {
  self.portfolio().loadPositions(JSON.parse(message.body));
});

在服务器端,PortfolioController 检测到请求并返回投资组合头寸,这展示了 Web 应用程序中非常常见的请求-回复交互。由于我们使用 Spring Security 来保护 HTTP 请求,包括导致 WebSocket 握手的请求,下面的 principal 方法参数取自 Spring Security 在 HttpServletRequest 上设置的用户 principal。

@Controller
public class PortfolioController {

  // ...

  @SubscribeEvent("/app/positions")
  public List<PortfolioPosition> getPortfolios(Principal principal) {
    String user = principal.getName();
    Portfolio portfolio = this.portfolioService.findPortfolio(user);
    return portfolio.getPositions();
  }
}

这是 portfolio.js 发送交易请求的代码

stompClient.send("/app/trade", {}, JSON.stringify(trade));

在服务器端,PortfolioController 发送交易以执行

@Controller
public class PortfolioController {

  // ...

  @MessageMapping(value="/app/trade")
  public void executeTrade(Trade trade, Principal principal) {
    trade.setUsername(principal.getName());
    this.tradeService.executeTrade(trade);
  }
}

PortfolioController 还可以通过向用户发送消息来处理意外异常。

@Controller
public class PortfolioController {

  // ...

  @MessageExceptionHandler
  @ReplyToUser(value="/queue/errors")
  public String handleException(Throwable exception) {
    return exception.getMessage();
  }
}

那么从应用程序内部向已订阅的客户端发送消息呢?这是 QuoteService 发送报价的方式

@Service
public class QuoteService {

  private final MessageSendingOperations<String> messagingTemplate;

  @Scheduled(fixedDelay=1000)
  public void sendQuotes() {
    for (Quote quote : this.quoteGenerator.generateQuotes()) {
      String destination = "/topic/price.stock." + quote.getTicker();
      this.messagingTemplate.convertAndSend(destination, quote);
    }
  }
}

这是 TradeService 在交易执行后发送头寸更新的方式

@Service
public class TradeService {

  // ...

  @Scheduled(fixedDelay=1500)
  public void sendTradeNotifications() {
    for (TradeResult tr : this.tradeResults) {
      String queue = "/queue/position-updates";
      this.messagingTemplate.convertAndSendToUser(tr.user, queue, tr.position);
    }
  }
}

顺便说一句,如果您想知道……是的,PortfolioController 也可以包含 Spring MVC 方法(例如 @RequestMapping),正如一位之前构建了在线游戏应用程序的开发人员在此工单中建议的那样

是的,将 [消息] 映射和 spring mvc 映射合并会很不错。它们没有理由不能统一。

就像 QuoteService 和 TradeService 一样,Spring MVC 控制器方法也可以发布消息。

Spring 应用程序的消息支持 长期以来,Spring Integration 为众所周知的企业集成模式以及轻量级消息传递提供了第一类抽象。在开发这个里程碑版本时,我们意识到后者正是我们需要基于其构建的。

因此,我很高兴地宣布,我们已将 Spring Integration 中的一些类型迁移到 Spring Framework 的一个新模块中,该模块很自然地被称为 spring-messaging。除了核心抽象,如 MessageMessageChannelMessageHandler 等,新模块还包含所有支持本文描述的新功能的注解和类。

考虑到这一点,我们现在可以看一下股票投资组合应用程序的内部架构图

Diagram of internal architecture with message broker

StompWebSocketHandler 将传入的客户端消息放在“dispatch”消息通道上。该通道有 3 个订阅者。第一个委托给带注解的方法,第二个将消息中继到 STOMP 消息代理,而第三个通过将目的地转换为客户端订阅的唯一队列名称来处理发给单个用户的消息(更多细节稍后介绍)。

默认情况下,应用程序使用提供的“简单”消息代理作为入门选项运行。正如示例 README 中所解释的,您可以通过激活和停用配置文件来在“简单”和功能齐全的消息代理之间切换。

Diagram of internal architecture with simple broker

另一个可能的配置更改是将 MessageChannel 的消息传递实现从基于 Executor 切换到基于 Reactor。最近发布第一个里程碑的 Reactor 项目也用于管理应用程序和消息代理之间的 TCP 连接。

您可以在此处查看完整的应用程序配置,其中还包括新的Spring Security Java 配置。您可能也对 改进的 STS 对 Java 配置的支持感兴趣。

向单个用户发送消息 向多个已订阅的客户端广播消息很容易,只需将消息发布到主题即可。但如何向特定用户发送消息则更困难。例如,您可能捕获到一个异常,并希望发送一条错误消息。或者您可能收到了交易确认,并希望将其发送给用户。

在传统的消息应用程序中,通常会创建一个临时队列并在需要回复的任何消息上设置“reply-to”头。这虽然可行,但在 Web 应用程序中感觉相当繁琐。客户端必须记住在所有适用的消息上设置必要的头,并且服务器应用程序可能需要跟踪并传递这些信息。有时这些信息可能根本不容易获得,例如在处理 HTTP POST 作为消息传递的替代方案时。

为了支持此需求,我们向每个连接的客户端发送一个唯一的队列后缀。然后可以将该后缀附加到其他名称上,以创建唯一的队列名称。

client.connect('guest', 'guest', function(frame) {

  var suffix = frame.headers['queue-suffix'];

  client.subscribe("/queue/error" + suffix, function(msg) {
    // handle error
  });

  client.subscribe("/queue/position-updates" + suffix, function(msg) {
    // handle position update
  });

});

然后在服务器端,@MessageExceptionHandler 方法(或任何消息处理方法)可以添加 @ReplyToUser 注解,以将返回值作为消息发送。

@MessageExceptionHandler
@ReplyToUser(value="/queue/errors")
public String handleException(Throwable exception) {
  // ...
}

所有其他类,例如 TradeService,都可以使用消息模板来实现相同的功能。

String user = "fabrice";
String queue = "/queue/position-updates";
this.messagingTemplate.convertAndSendToUser(user, queue, position);

在这两种情况下,我们在内部通过配置的 UserQueueSuffixResolver 定位用户队列后缀,以便重构正确的队列名称。目前只有一个简单的解析器实现。然而,添加一个 Redis 实现会很容易,无论用户是连接到此应用服务器还是其他应用服务器,它都将支持相同的功能。

结论 希望这篇介绍对新功能有所帮助。与其写得更长,不如鼓励您查看示例,并考虑它对您正在编写或打算编写的应用程序意味着什么。在我们努力争取在九月初发布候选版本时,现在正是提供反馈的绝佳时机。

要使用 Spring Framework 4.0.0.M2,请将 http://repo.springsource.org/libs-milestonehttp://repo.springsource.org/milestone 仓库添加到您的配置中。前者包含传递性依赖项,正如我们在仓库 FAQ 中所解释的那样。

SpringOne 2GX 2013 即将到来

尽快预订您在圣克拉拉参加 SpringOne 大会的席位。这绝对是第一手了解所有进展并提供直接反馈的最佳机会。预计今年会有一些重要的新宣布。查阅最近的博客文章,看看我说的是什么,还有更多内容即将到来!

获取 Spring 通讯

订阅 Spring 通讯,保持联系

订阅

保持领先

VMware 提供培训和认证,为您的进步加速。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部