26. WebSocket Support

参考文档的这一部分涵盖了 Spring Framework 对 Web 应用程序中 WebSocket 样式消息传递的支持,包括将 STOMP 用作应用程序级别的 WebSocket 子协议。

第 26.1 节“简介”构建了一个思考 WebSocket 的思路,涵盖了采用方面的挑战,设计注意事项以及有关何时合适的想法。

第 26.2 节“ WebSocket API”在服务器端审查了 Spring WebSocket API,而第 26.3 节“ SockJS 后备”解释了 SockJS 协议并展示了如何配置和使用它。

第 26.4.1 节“概述”引入了 STOMP 消息传递协议。 第 26.4.3 节“启用 STOMP”演示了如何在 Spring 中配置 STOMP 支持。 第 26.4.5 节“带注解的控制器”和以下各节说明如何编写带注解的消息处理方法,发送消息,选择消息代理选项以及如何使用特殊的“用户”目的地。最后,第 26.4.19 节,“测试”列出了三种测试 STOMP/WebSocket 应用程序的方法。

26.1 Introduction

WebSocket 协议RFC 6455为 Web 应用程序定义了一项重要的新功能:Client 端与服务器之间的全双工双向通信。这是令人兴奋的一项新功能,它紧跟着悠久的技术历史,使 Web 更具交互性,包括 Java Applet,XMLHttpRequest,Adobe Flash,ActiveXObject,各种 Comet 技术,服务器发送的事件等。

对 WebSocket 协议的正确介绍超出了本文档的范围。但是,至少必须了解,HTTP 仅用于初始握手,这依赖于 HTTP 内置的机制来请求服务器可以以 HTTP 状态响应的协议升级(或本例中的协议切换)机制 101 (交换协议)是否同意。假设握手成功,则 HTTP 升级请求所基于的 TCP 套接字保持打开状态,并且 Client 端和服务器都可以使用该套接字向彼此发送消息。

Spring Framework 4 包括一个新的spring-websocket模块,具有全面的 WebSocket 支持。它与 Java WebSocket API 标准(JSR-356)兼容,并且还提供了附加的附加值,如引言的其余部分所述。

26.1.1 WebSocket 后备选项

采用过程中的一个重要挑战是某些浏览器缺乏对 WebSocket 的支持。值得注意的是,第一个支持 WebSocket 的 Internet Explorer 版本是版本 10(有关浏览器版本的支持,请参阅http://caniuse.com/websockets)。此外,某些限制性代理的配置方式可能会阻止进行 HTTP 升级的尝试,也可能会在一段时间后断开连接,因为该连接保持打开时间过长。 InfoQ 文章“ HTML5 Web 套接字如何与代理服务器交互”中提供了 Peter Lubbers 对此主题的详尽概述。

因此,今天要构建 WebSocket 应用程序,需要回退选项,以便在必要时模拟 WebSocket API。 Spring 框架基于SockJS protocol提供了这种透明的后备选项。可以通过配置启用这些选项,否则无需修改应用程序。

26.1.2 消息传递体系结构

除了短期到中期的采用挑战之外,使用 WebSocket 还带来了重要的设计注意事项,这些考虑要尽早识别,尤其是与我们今天所了解的有关构建 Web 应用程序的知识相反。

如今,REST 已成为构建 Web 应用程序的一种被广泛接受,理解和支持的体系结构。它是一种架构,它依赖于具有许多 URL(名词),少数 HTTP 方法(* verbs )和其他原理,例如使用超媒体( links *),保持 Stateless 等。

相比之下,WebSocket 应用程序可以仅将单个 URL 用于初始 HTTP 握手。此后,所有消息共享并在同一 TCP 连接上流动。这指向了一个完全不同的,异步的,事件驱动的消息传递体系结构。与传统消息传递应用程序(例如 JMS,AMQP)更接近的一种。

Spring Framework 4 包括一个新的spring-messaging模块,该模块具有来自Spring Integration项目的关键抽象,例如MessageMessageChannelMessageHandler以及可以用作此类消息传递体系结构基础的其他模块。该模块还包括一组注解,用于将消息 Map 到方法,类似于基于 Spring MVC注解的编程模型。

26.1.3 WebSocket 中的子协议支持

WebSocket 确实暗示着消息传递体系结构,但并不要求使用任何特定的消息传递协议。它是 TCP 上的一个非常薄的层,它将字节流转换为消息流(文本或二进制),而没有更多。由应用程序来解释消息的含义。

与 HTTP(它是应用程序级协议)不同,在 WebSocket 协议中,传入消息中根本没有足够的信息供框架或容器知道如何路由或处理它。因此,对于非常琐碎的应用程序而言,WebSocket 的级别可以说太低了。可以做到,但是可能会导致在顶部创建一个框架。这相当于当今大多数 Web 应用程序是使用 Web 框架而不是仅使用 Servlet API 编写的。

因此,WebSocket RFC 定义了sub-protocols的使用。在握手期间,Client 端和服务器可以使用 HeadersSec-WebSocket-Protocol来同意子协议,即要使用的更高的应用程序级别协议。不需要使用子协议,但是即使不使用子协议,应用程序仍将需要选择 Client 端和服务器都可以理解的消息格式。该格式可以是自定义的,特定于框架的或标准的消息传递协议。

Spring 框架提供了对使用STOMP的支持,这是一种简单的消息传递协议,最初是为使用受 HTTP 启发的框架的脚本语言而创建的。 STOMP 得到广泛支持,非常适合在 WebSocket 和 Web 上使用。

26.1.4 我应该使用 WebSocket 吗?

考虑到围绕 WebSocket 的使用的所有设计注意事项,因此合理地提出“何时使用它是适当的?”的问题。

最适合 WebSocket 的 Web 应用程序是 Client 端和服务器需要高频且低延迟地交换事件的 Web 应用程序。主要候选人包括但不限于金融,游戏,协作等领域的应用程序。这样的应用程序对时间延迟非常敏感,并且还需要高频交换各种消息。

但是,对于其他应用程序类型,情况可能并非如此。例如,通过每几分钟进行一次简单轮询就可以很好地显示新闻或社交新闻,该新闻或社交新闻会在新闻出现时向其显示。在这里,延迟很重要,但是只要新闻显示几分钟就可以接受。

即使在延迟至关重要的情况下,如果消息量相对较低(例如,监视网络故障),也应将long polling的使用视为相对简单的替代方案,该方案可靠地工作并且在效率方面具有可比性(再次假设信息相对较低)。

它是低延迟和高频率消息的结合,这使得使用 WebSocket 协议变得至关重要。即使在这样的应用程序中,仍然选择是否所有 Client 端-服务器通信都应该通过 WebSocket 消息完成,而不是使用 HTTP 和 REST。答案将因应用程序而异。但是,某些功能可能同时通过 WebSocket 和 REST API 公开,以便为 Client 端提供替代方案。此外,REST API 调用可能需要向通过 WebSocket 连接的感兴趣的 Client 端 Broadcast 消息。

Spring Framework 允许@Controller@RestController类同时具有 HTTP 请求处理和 WebSocket 消息处理方法。此外,Spring MVC 请求处理方法或任何与此相关的应用程序方法都可以轻松地向所有感兴趣的 WebSocketClient 端或特定用户 Broadcast 消息。

26.2 WebSocket API

Spring 框架提供了一个 WebSocket API,旨在适应各种 WebSocket 引擎。当前,列表包括 WebSocket 运行时,例如 Tomcat 7.0.47,Jetty 9.1,GlassFish 4.1,WebLogic 12.1.3 和 Undertow 1.0(和 WildFly 8.0)。随着更多的 WebSocket 运行时可用,可能会添加其他支持。

Note

introduction所述,对于应用程序而言,直接使用 WebSocket API 的级别太低-除非假设消息的格式,否则几乎没有框架可以解释消息或通过注解路由消息。这就是为什么应用程序应该考虑使用子协议和 Spring 的通过 WebSocket 进行 STOMP支持的原因。

当使用更高级别的协议时,WebSocket API 的细节变得不那么相关,就像使用 HTTP 时不向应用程序公开 TCP 通信的细节一样。但是,本节将介绍直接使用 WebSocket 的详细信息。

26.2.1 WebSocketHandler

创建 WebSocket 服务器就像实现WebSocketHandler一样简单,或者更可能扩展TextWebSocketHandlerBinaryWebSocketHandler

import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;

public class MyHandler extends TextWebSocketHandler {

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message) {
        // ...
    }

}

有专门的 WebSocket Java-config 和 XML 名称空间支持,用于将上述 WebSocket 处理程序 Map 到特定的 URL:

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

XML 配置等效项:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

上面的代码用于 Spring MVC 应用程序,应包含在DispatcherServlet的配置中。但是,Spring 的 WebSocket 支持不依赖于 Spring MVC。在WebSocketHttpRequestHandler的帮助下将WebSocketHandler集成到其他 HTTP 服务环境中相对简单。

26.2.2 WebSocket 握手

定制初始 HTTP WebSocket 握手请求的最简单方法是通过HandshakeInterceptor,该公开暴露握手方法的“之前”和“之后”。此类拦截器可用于排除握手或使WebSocketSession可以使用任何属性。例如,有一个内置的拦截器,用于将 HTTP 会话属性传递到 WebSocket 会话:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new MyHandler(), "/myHandler")
            .addInterceptors(new HttpSessionHandshakeInterceptor());
    }

}

和 XML 配置等效:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
        <websocket:handshake-interceptors>
            <bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/>
        </websocket:handshake-interceptors>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

一个更高级的选项是扩展执行 WebSocket 握手步骤的DefaultHandshakeHandler,包括验证 Client 端来源,协商子协议等。如果应用程序需要配置自定义RequestUpgradeStrategy以适应尚不支持的 WebSocket 服务器引擎和版本,则它可能还需要使用此选项(有关此主题的更多信息,请参阅第 26.2.4 节“部署”)。 Java-config 和 XML 名称空间都可以配置自定义HandshakeHandler

26.2.3 WebSocketHandler 装饰

Spring 提供了一个WebSocketHandlerDecoratorBase Class,可用于装饰WebSocketHandler并具有其他行为。使用 WebSocket Java 配置或 XML 名称空间时,默认情况下会提供并添加日志记录和异常处理实现。 ExceptionWebSocketHandlerDecorator捕获由任何 WebSocketHandler 方法引起的所有未捕获的异常,并关闭状态为1011的 WebSocket 会话,该会话指示服务器错误。

26.2.4 Deployment

Spring WebSocket API 易于集成到 Spring MVC 应用程序中,其中DispatcherServlet既可用于 HTTP WebSocket 握手,也可用于其他 HTTP 请求。通过调用WebSocketHttpRequestHandler,也很容易将其集成到其他 HTTP 处理方案中。这是方便且易于理解的。但是,对于 JSR-356 运行时,需要特别注意。

Java WebSocket API(JSR-356)提供了两种部署机制。第一个涉及启动时的 Servlet 容器 Classpath 扫描(Servlet 3 功能);另一个是在 Servlet 容器初始化时使用的注册 API。这两种机制都无法对所有 HTTP 处理(包括 WebSocket 握手和所有其他 HTTP 请求)(例如 Spring MVC 的DispatcherServlet)使用单个“前端控制器”。

这是对 JSR-356 的一个重大限制,即使在 JSR-356 运行时中运行,Spring 的 WebSocket 支持通过提供特定于服务器的RequestUpgradeStrategy来解决该问题。

Note

已经创建了克服 Java WebSocket API 中的上述限制的请求,可以在WEBSOCKET_SPEC-211处进行跟踪。还要注意,Tomcat 和 Jetty 已经提供了本机 API 替代方案,可以轻松克服该限制。我们希望,无论何时在 Java WebSocket API 中解决问题,更多的服务器都将遵循它们的示例。

另一个要考虑的因素是,希望支持 JSR-356 的 Servlet 容器执行ServletContainerInitializer(SCI)扫描,这可能会减慢应用程序的启动速度,在某些情况下会大大降低启动速度。如果在升级到支持 JSR-356 的 Servlet 容器版本后观察到重大影响,则应该可以通过使用web.xml中的<absolute-ordering />元素来选择性地启用或禁用 Web 片段(和 SCI 扫描):

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://java.sun.com/xml/ns/javaee
        http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">

    <absolute-ordering/>

</web-app>

然后,您可以按名称有选择地启用 Web 片段,例如 Spring 自己的SpringServletContainerInitializer,如果需要,它支持 Servlet 3 Java 初始化 API:

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://java.sun.com/xml/ns/javaee
        http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">

    <absolute-ordering>
        <name>spring_web</name>
    </absolute-ordering>

</web-app>

26.2.5 配置 WebSocket 引擎

每个基础 WebSocket 引擎都公开配置属性,这些属性控制运行时特性,例如消息缓冲区大小的大小,空闲超时等。

对于 Tomcat,WildFly 和 GlassFish,将ServletServerContainerFactoryBean添加到 WebSocket Java 配置中:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

}

或 WebSocket XML 名称空间:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <bean class="org.springframework...ServletServerContainerFactoryBean">
        <property name="maxTextMessageBufferSize" value="8192"/>
        <property name="maxBinaryMessageBufferSize" value="8192"/>
    </bean>

</beans>

Note

对于 Client 端 WebSocket 配置,您应该使用WebSocketContainerFactoryBean(XML)或ContainerProvider.getWebSocketContainer()(Java config)。

对于 Jetty,您需要提供一个预先配置的 Jetty WebSocketServerFactory,然后通过 WebSocket Java 配置将其插入 Spring 的DefaultHandshakeHandler

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(echoWebSocketHandler(),
            "/echo").setHandshakeHandler(handshakeHandler());
    }

    @Bean
    public DefaultHandshakeHandler handshakeHandler() {

        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);

        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }

}

或 WebSocket XML 名称空间:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/echo" handler="echoHandler"/>
        <websocket:handshake-handler ref="handshakeHandler"/>
    </websocket:handlers>

    <bean id="handshakeHandler" class="org.springframework...DefaultHandshakeHandler">
        <constructor-arg ref="upgradeStrategy"/>
    </bean>

    <bean id="upgradeStrategy" class="org.springframework...JettyRequestUpgradeStrategy">
        <constructor-arg ref="serverFactory"/>
    </bean>

    <bean id="serverFactory" class="org.eclipse.jetty...WebSocketServerFactory">
        <constructor-arg>
            <bean class="org.eclipse.jetty...WebSocketPolicy">
                <constructor-arg value="SERVER"/>
                <property name="inputBufferSize" value="8092"/>
                <property name="idleTimeout" value="600000"/>
            </bean>
        </constructor-arg>
    </bean>

</beans>

26.2.6 配置允许的来源

从 Spring Framework 4.1.5 开始,WebSocket 和 SockJS 的默认行为是仅接受“相同来源”请求。也可以允许* all *或指定的来源列表。此检查主要用于浏览器 Client 端。没有什么可以阻止其他类型的 Client 端修改OriginHeaders 值(有关更多详细信息,请参见RFC 6454:Web 起源概念)。

3 种可能的行为是:

  • 仅允许相同的原始请求(默认):在此模式下,启用 SockJS 时,Iframe HTTP 响应 HeadersX-Frame-Options设置为SAMEORIGIN,并且 JSONP 传输被禁用,因为它不允许检查请求的来源。因此,启用此模式时,不支持 IE6 和 IE7.

  • 允许指定来源列表:每个提供的允许的来源必须以http://https://开头。在此模式下,启用 SockJS 时,将同时禁用基于 IFrame 和 JSONP 的传输。因此,启用此模式时,不支持 IE6 至 IE9.

  • 允许所有来源:要启用此模式,您应提供*作为允许的来源值。在这种模式下,所有传输都可用。

WebSocket 和 SockJS 允许的来源可以如下所示配置:

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("http://mydomain.com");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

XML 配置等效项:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers allowed-origins="http://mydomain.com">
        <websocket:mapping path="/myHandler" handler="myHandler" />
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

26.3 SockJS 后备

introduction所述,并非所有浏览器都支持 WebSocket,并且受限于网络代理可能不支持 WebSocket。这就是为什么 Spring 提供基于SockJS protocol(版本 0.3.3)而尽可能接近地模拟 WebSocket API 的后备选项的原因。

26.3.1 Overview

SockJS 的目标是让应用程序使用 WebSocket API,但在运行时必要时可以使用非 WebSocket 替代方案,即无需更改应用程序代码。

SockJS 包括:

  • 以可执行文件narrated tests的形式定义的SockJS protocol

  • SockJS JavaScriptClient 端-在浏览器中使用的 Client 端库。

  • SockJS 服务器实现包括 Spring Framework spring-websocket模块中的一个。

  • 从 4.1 开始,spring-websocket还提供了 SockJS JavaClient 端。

SockJS 设计用于浏览器。它使用各种技术竭尽全力来支持各种浏览器版本。有关 SockJS 传输类型和浏览器的完整列表,请参见SockJS client页。传输分为 3 大类:WebSocket,HTTP 流和 HTTP 长轮询。有关这些类别的概述,请参见此博客文章

SockJSClient 端首先发送"GET /info"以从服务器获取基本信息。之后,它必须决定使用哪种交通工具。如果可能,使用 WebSocket。如果不是,则在大多数浏览器中,至少有一个 HTTP 流选项,如果没有,则使用 HTTP(长)轮询。

所有传输请求都具有以下 URL 结构:

http://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
  • {server-id}-在路由集群中的请求时很有用,但否则不使用。

  • {session-id}-关联属于 SockJS 会话的 HTTP 请求。

  • {transport}-表示运输类型,例如“ websocket”,“ xhr-streaming”等

WebSocket 传输仅需要单个 HTTP 请求即可进行 WebSocket 握手。此后所有消息在该套接字上交换。

HTTP 传输需要更多请求。例如,Ajax/XHR 流依赖于对服务器到 Client 端消息的一个长期运行的请求,以及对 Client 端到服务器消息的其他 HTTP POST 请求。长轮询与之类似,只是长轮询在每次服务器到 Client 端发送后结束当前请求。

SockJS 添加了最少的消息框架。例如,服务器最初发送字母 o(“打开”框架),消息以 a “ message1”,“ message2”的形式发送,如果没有消息流,则以字母 h(“心跳”框架)发送默认情况下为 25 秒,然后使用字母 c(“关闭”框架)关闭会话。

要了解更多信息,请在浏览器中运行示例并查看 HTTP 请求。 SockJSClient 端允许修复传输列表,因此可以一次查看每个传输。 SockJSClient 端还提供了一个调试标志,可在浏览器控制台中启用有用的消息。在服务器端,为org.springframework.web.socket启用TRACE日志记录。有关更多详细信息,请参阅 SockJS 协议narrated test

26.3.2 启用 SockJS

通过 Java 配置可以轻松启用 SockJS:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").withSockJS();
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

和等效的 XML 配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
        <websocket:sockjs/>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

上面的代码用于 Spring MVC 应用程序,应包含在DispatcherServlet的配置中。但是,Spring 的 WebSocket 和 SockJS 支持不依赖于 Spring MVC。在SockJsHttpRequestHandler的帮助下,将其集成到其他 HTTP 服务环境中相对简单。

在浏览器端,应用程序可以使用模拟 W3C WebSocket API 的sockjs-client(1.0.x 版)并与服务器进行通信,以根据运行的浏览器选择最佳传输选项。查看sockjs-client页和传输列表浏览器支持的类型。Client 端还提供了几个配置选项,例如,指定要包括的传输。

26.3.3 IE 8、9

Internet Explorer 8 和 9 将会并且会保留一段时间。它们是拥有 SockJS 的关键原因。本节涵盖有关在这些浏览器中运行的重要注意事项。

SockJSClient 端通过 Microsoft 的XDomainRequest支持 IE 8 和 9 中的 Ajax/XHR 流。这适用于所有域,但不支持发送 Cookie。 Cookies 通常对于 Java 应用程序是必不可少的。但是,由于 SockJSClient 端可用于多种服务器类型(不仅是 Java 服务器类型),因此需要知道 cookie 是否重要。如果是这样,则 SockJSClient 端更喜欢 Ajax/XHR 进行流传输,否则它依赖于基于 iframe 的技术。

SockJSClient 端发出的第一个"/info"请求是对可能影响 Client 端选择传输方式的信息的请求。这些细节之一是服务器应用程序是否依赖 Cookie,例如用于身份验证或通过粘性会话进行群集。 Spring 的 SockJS 支持包括名为sessionCookieNeeded的属性。由于大多数 Java 应用程序都依赖JSESSIONID cookie,因此默认情况下启用该功能。如果您的应用程序不需要它,则可以关闭此选项,并且 SockJSClient 端应在 IE 8 和 9 中选择xdr-streaming

如果您确实使用基于 iframe 的传输,那么无论如何,最好知道是通过将 HTTP 响应 HeadersX-Frame-Options设置为DENYSAMEORIGINALLOW-FROM <origin>来指示浏览器阻止在给定页面上使用 iframe。这用于防止clickjacking

Note

Spring Security 3.2 提供了对每个响应设置X-Frame-Options的支持。默认情况下,Spring Security Java 配置将其设置为DENY。在 3.2 中,Spring Security XML 名称空间默认情况下不设置该 Headers,但可以配置为这样做,并且将来可以默认设置它。

有关如何配置X-Frame-OptionsHeaders 设置的详细信息,请参见 Spring Security 文档的第 7.1 节。 “默认安全标题”。您也可以检查或观看SEC-2501以获得其他背景。

如果您的应用程序添加了X-Frame-Options响应 Headers(应如此!)并依赖于基于 iframe 的传输,则需要将 Headers 值设置为SAMEORIGINALLOW-FROM <origin>。除此之外,Spring SockJS 支持还需要知道 SockJSClient 端的位置,因为它是从 iframe 加载的。默认情况下,iframe 设置为从 CDN 位置下载 SockJSClient 端。最好将此选项配置为与应用程序源相同的 URL。

在 Java 配置中,可以如下所示进行。 XML 名称空间通过<websocket:sockjs>元素提供了类似的选项:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS()
                .setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js");
    }

    // ...

}

Note

在初始开发期间,请启用 SockJSClient 端devel模式,以防止浏览器缓存本应缓存的 SockJS 请求(如 iframe)。有关如何启用它的详细信息,请参见SockJS client页。

26.3.4 Heartbeats

SockJS 协议要求服务器发送心跳消息,以防止代理服务器断定连接已挂起。 Spring SockJS 配置具有名为heartbeatTime的属性,可用于自定义频率。默认情况下,假设该连接上未发送其他消息,则在 25 秒后发送心跳。对于公共 Internet 应用程序,此 25 秒值与以下IETF recommendation一致。

Note

当通过 WebSocket/SockJS 使用 STOMP 时,如果 STOMPClient 端和服务器协商要交换的心跳,则会禁用 SockJS 心跳。

Spring SockJS 支持还允许配置TaskScheduler以计划心跳任务。任务调度程序由线程池支持,该线程池具有基于可用处理器数量的默认设置。应用程序应考虑根据其特定需求自定义设置。

26.3.5Client 端断开连接

HTTP 流和 HTTP 长轮询 SockJS 传输要求连接保持打开的时间比平常更长。有关这些技术的概述,请参见此博客文章

在 Servlet 容器中,这是通过 Servlet 3 异步支持完成的,该支持允许退出 Servlet 容器线程来处理请求并 continue 写入另一个线程的响应。

一个特定的问题是,Servlet API 不会为已离开的 Client 端提供通知,请参见SERVLET_SPEC-44。但是,Servlet 容器在随后尝试写入响应时会引发异常。由于 Spring 的 SockJS 服务支持发送的心跳(默认情况下,每 25 秒发送一次),这意味着通常会在该时间段内或更早地检测到 Client 端断开连接(如果消息发送频率更高)。

Note

结果,仅由于 Client 端已断开连接而可能发生网络 IO 故障,这可能在日志中填充不必要的堆栈跟踪。 Spring 会尽最大努力找出代表 Client 端断开连接(特定于每个服务器)的网络故障,并使用AbstractSockJsSession中定义的专用日志类别DISCONNECTED_CLIENT_LOG_CATEGORY记录一条最小的消息。如果需要查看堆栈跟踪,请将该日志类别设置为 TRACE。

26.3.6 SockJS 和 CORS

如果您允许跨域请求(请参阅第 26.2.6 节“配置允许的来源”),则 SockJS 协议将 CORS 用于 XHR 流和轮询传输中的跨域支持。因此,除非检测到响应中存在 CORS 头,否则将自动添加 CORS 头。因此,如果某个应用程序已配置为提供 CORS 支持,例如通过 Servlet 过滤器,Spring 的 SockJsService 将跳过这一部分。

也可以通过 Spring 的 SockJsService 中的suppressCors属性来禁用这些 CORSHeaders 的添加。

以下是 SockJS 期望的标题和值的列表:

  • "Access-Control-Allow-Origin"-从“ Origin”请求 Headers 的值初始化。

  • "Access-Control-Allow-Credentials"-始终设置为true

  • "Access-Control-Request-Headers"-从等效请求 Headers 中的值初始化。

  • "Access-Control-Allow-Methods"-传输支持的 HTTP 方法(请参见TransportType枚举)。

  • "Access-Control-Max-Age"-设置为 31536000(1 年)。

有关确切的实现,请参见AbstractSockJsService中的addCorsHeaders以及源代码中的TransportType枚举。

或者,如果 CORS 配置允许,则考虑排除带有 SockJS 端点前缀的 URL,从而让 Spring 的SockJsService处理它。

26.3.7 SockJsClient

提供了一个 SockJS JavaClient 端,以便在不使用浏览器的情况下连接到远程 SockJS 端点。当需要通过公共网络在 2 台服务器之间进行双向通信时,即在网络代理可能无法使用 WebSocket 协议的情况下,这特别有用。 SockJS JavaClient 端对于测试目的也非常有用,例如,模拟大量并发用户。

SockJS JavaClient 端支持“ websocket”,“ xhr-streaming”和“ xhr-polling”传输。其余的仅在浏览器中使用才有意义。

WebSocketTransport可以配置为:

  • 在 JSR-356 运行时中StandardWebSocketClient

  • JettyWebSocketClient使用 Jetty 9 本机 WebSocket API

  • Spring WebSocketClient的任何实现

根据定义,XhrTransport支持“ xhr-streaming”和“ xhr-polling”,因为从 Client 端的角度来看,除了用于连接服务器的 URL 之外没有其他区别。当前有两种实现:

  • RestTemplateXhrTransport使用 Spring 的RestTemplate进行 HTTP 请求。

  • JettyXhrTransport使用 Jetty 的HttpClient进行 HTTP 请求。

下面的示例显示了如何创建 SockJSClient 端并连接到 SockJS 端点:

List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());

SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");

Note

SockJS 对消息使用 JSON 格式的数组。默认情况下,使用 Jackson 2,并且需要在 Classpath 上。或者,您可以配置SockJsMessageCodec的自定义实现,并在SockJsClient上对其进行配置。

要使用 SockJsClient 模拟大量并发用户,您将需要配置基础 HTTPClient 端(用于 XHR 传输)以允许足够数量的连接和线程。例如,与 Jetty:

HttpClient jettyHttpClient = new HttpClient();
jettyHttpClient.setMaxConnectionsPerDestination(1000);
jettyHttpClient.setExecutor(new QueuedThreadPool(1000));

考虑还自定义这些与服务器端 SockJS 相关的属性(有关详细信息,请参见 Javadoc):

@Configuration
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/sockjs").withSockJS()
            .setStreamBytesLimit(512 * 1024)
            .setHttpMessageCacheSize(1000)
            .setDisconnectDelay(30 * 1000);
    }

    // ...
}

26.4 STOMP

WebSocket 协议定义了两种消息类型:文本消息和二进制消息,但是它们的内容未定义。定义了一种机制,供 Client 端和服务器协商子协议(即高级消息传递协议),以在 WebSocket 上使用以定义每个消息可以发送的消息种类,每个消息的格式和内容,等等。上。子协议的使用是可选的,但是无论哪种方式,Client 端和服务器都需要就定义消息内容的某种协议达成一致。

26.4.1 Overview

STOMP是一种简单的面向文本的消息传递协议,最初是为 Ruby,Python 和 Perl 等脚本语言创建的,以连接到企业消息代理。它旨在解决常用消息传递模式的子集。 STOMP 可以在任何可靠的 2 路流网络协议(例如 TCP 和 WebSocket)上使用。尽管 STOMP 是面向文本的协议,但是消息的有效载荷可以是文本或二进制。

STOMP 是基于帧的协议,其帧以 HTTP 为模型。 STOMP 帧的结构:

COMMAND
header1:value1
header2:value2

Body^@

Client 端可以使用 SEND 或 SUBSCRIBE 命令发送或订阅消息以及“目的地”Headers,该 Headers 描述消息的含义以及应由谁接收。这启用了一种简单的发布-订阅机制,该机制可用于通过代理将消息发送到其他连接的 Client 端,或将消息发送到服务器以请求执行某些工作。

使用 Spring 的 STOMP 支持时,Spring WebSocket 应用程序充当 Client 端的 STOMP 代理。消息被路由到@Controller消息处理方法或简单的内存中代理,该代理跟踪订阅并向订阅的用户 Broadcast 消息。您还可以将 Spring 配置为与专用的 STOMP 代理(例如 RabbitMQ,ActiveMQ 等)一起使用,以实际 Broadcast 消息。在那种情况下,Spring 维护与代理的 TCP 连接,将消息中继到该代理,并将消息从该代理向下传递到已连接的 WebSocketClient 端。因此,Spring Web 应用程序可以依赖于基于统一 HTTP 的安全性,通用验证以及熟悉的编程模型消息处理工作。

这是一个 Client 订阅接收股票报价的示例,服务器可能会定期发出该报价,例如通过 sched 任务通过SimpMessagingTemplate向代理发送消息:

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

这是 Client 发送 Transaction 请求的示例,服务器可以通过@MessageMapping方法处理该请求,然后在执行之后,向 ClientBroadcastTransaction 确认消息和详细信息:

SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

在 STOMP 规范中,目的地的含义是故意不透明的。它可以是任何字符串,并且完全由 STOMP 服务器定义它们所支持的目的地的语义和语法。但是,目的地通常是类似路径的字符串,其中"/topic/.."表示发布-订阅(一对一),而"/queue/"表示点对点(一对一)消息交换。

STOMP 服务器可以使用 MESSAGE 命令向所有订户 Broadcast 消息。这是服务器向订阅的 Client 发送股票报价的示例:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

重要的是要知道服务器无法发送未经请求的消息。来自服务器的所有消息都必须响应特定的 Client 端订阅,并且服务器消息的“ subscription-id”Headers 必须与 Client 端订阅的“ id”Headers 匹配。

上面的概述旨在提供对 STOMP 协议的最基本的了解。建议全面查看协议specification

26.4.2 Benefits

与使用原始 WebSocket 相比,使用 STOMP 作为子协议使 Spring 框架和 Spring Security 可以提供更丰富的编程模型。关于 HTTP 与原始 TCP 的关系以及它如何使 Spring MVC 和其他 Web 框架提供丰富的功能,可以得出相同的观点。以下是好处列表:

  • 无需发明自定义消息协议和消息格式。

  • 可用的 STOMPClient 端在 Spring Framework 中包括Java client

  • 消息代理(例如 RabbitMQ,ActiveMQ 和其他代理)可以(可选)用于 Management 订阅和 Broadcast 消息。

  • 可以使用任意数量的@Controller来组织应用程序逻辑,并根据 STOMP 目标 Headers 将消息路由到它们,而对于给定的连接,使用单个WebSocketHandler处理原始 WebSocket 消息。

  • 使用 Spring Security 基于 STOMP 目的地和消息类型来保护消息。

26.4.3 启用 STOMP

spring-messagingspring-websocket模块中提供了 STOMP over WebSocket 支持。一旦有了这些依赖关系,就可以使用第 26.3 节“ SockJS 后备”通过 WebSocket 公开 STOMP 端点,如下所示:

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");
    }
}
  • "/portfolio"是 WebSocket(或 SockJS)Client 端需要连接到的端点的 HTTP URL,以进行 WebSocket 握手。
  • 其目标 Headers 以"/app"开头的 STOMP 消息将路由到@Controller类中的@MessageMapping方法。
  • 使用内置的消息代理进行订阅和 Broadcast;将目标 Headers 以“/topic”或“/queue”开头的消息路由到代理。

XML 中的相同配置:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>

</beans>

Note

对于内置的简单代理,“/topic”和“/queue”前缀没有任何特殊含义。它们只是区分发布订阅消息和点对点消息传递的约定(即,许多订户与一个 Consumer)。使用外部代理时,请检查代理的 STOMP 页面以了解其支持哪种 STOMP 目标和前缀。

要从浏览器进行连接,对于 SockJS,您可以使用sockjs-client。对于 STOMP,许多应用程序都使用了jmesnil/stomp-websocket库(也称为 stomp.js),该库功能齐全,已经在 Producing 使用了多年,但不再维护。目前,JSteunou/webstomp-client是该库中最活跃,Developing 最快的继承者,下面的示例代码基于此:

var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);

stompClient.connect({}, function(frame) {
}

或者,如果通过 WebSocket 连接(没有 SockJS):

var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
}

请注意,上面的stompClient不需要指定loginpasscodeHeaders。即使这样做,它们也会在服务器端被忽略,甚至被覆盖。有关身份验证的更多信息,请参见第 26.4.9 节“连接到代理”第 26.4.11 节,“身份验证”部分。

有关更多示例代码,请参见:

26.4.4 消息流

公开 STOMP 端点后,Spring 应用程序将成为已连接 Client 端的 STOMP 代理。本节描述服务器端的消息流。

spring-messaging模块包含对源自Spring Integration的消息传递应用程序的基础支持,后来被提取并合并到 Spring 框架中,以便在许多Spring projects和应用程序场景中更广泛地使用。下面是一些可用的消息传递抽象的列表:

Java 配置(即@EnableWebSocketMessageBroker)和 XML 名称空间配置(即<websocket:message-broker>)都使用上述组件来组装消息工作流。下图显示了启用简单的内置消息代理时使用的组件:

消息流简单代理

上图中有 3 个消息通道:

  • "clientInboundChannel" —用于传递从 WebSocketClient 端收到的消息。

  • "clientOutboundChannel" —用于向 WebSocketClient 端发送服务器消息。

  • "brokerChannel"-用于从服务器端应用程序代码内将消息发送到消息代理。

下图显示了将外部代理(例如 RabbitMQ)配置为用于 Management 订阅和 Broadcast 消息时使用的组件:

消息流代理中继

上图中的主要区别是使用“代理中继”将消息通过 TCP 传递到外部 STOMP 代理,以及将消息从代理传递到订阅的 Client 端。

从 WebSocket 连接器接收到消息后,消息将被解码为 STOMP 帧,然后转换为 Spring Message表示形式,然后发送至"clientInboundChannel"进行进一步处理。例如,目标 Headers 以"/app"开头的 STOMP 消息可以路由到带注解的控制器中的@MessageMapping方法,而"/topic""/queue"消息可以直接路由到消息代理。

处理来自 Client 端的 STOMP 消息的带注解的@Controller可以通过"brokerChannel"向消息代理发送消息,并且代理将通过"clientOutboundChannel"将消息 Broadcast 给匹配的订户。相同的控制器还可以响应 HTTP 请求执行相同的操作,因此 Client 端可以执行 HTTP POST,然后@PostMapping方法可以将消息发送到消息代理,以 Broadcast 到订阅的 Client 端。

让我们通过一个简单的示例跟踪流程。给定以下服务器设置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }

}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }

}
  • Client 端连接到"http://localhost:8080/portfolio",并且一旦构建 WebSocket 连接,STOMP 帧就开始在其上流动。

  • Client 端发送带有目 Headers"/topic/greeting"的 SUBSCRIBE 帧。收到并解码后,该消息将发送到"clientInboundChannel",然后路由到存储 Client 端订阅的消息代理。

  • Client 端将 SEND 帧发送到"/app/greeting""/app"前缀有助于将其路由到带注解的控制器。除去"/app"前缀后,目标的其余"/greeting"部分将 Map 到GreetingController中的@MessageMapping方法。

  • GreetingController返回的值被转换为带有返回值和默认目 Headers"/topic/greeting"(从 Importing 目标中用"/app"替换为"/topic"的源目标)的有效负载的 Spring Message。结果消息将发送到“ brokerChannel”并由消息代理处理。

  • 消息代理查找所有匹配的订阅者,然后通过"clientOutboundChannel"向每个订阅者发送 MESSAGE 帧,消息从此处被编码为 STOMP 帧并通过 WebSocket 连接发送。

下一部分将提供有关带注解方法的更多详细信息,包括支持的参数类型和返回值。

26.4.5 带注解的控制器

应用程序可以使用带注解的@Controller类来处理来自 Client 端的消息。这样的类可以声明@MessageMapping@SubscribeMapping@ExceptionHandler方法,如下所述。

@MessageMapping

@MessageMapping注解可用于根据消息的目的地路由消息的方法。在方法级别和类型级别都支持它。在类型级别@MessageMapping上用于表示控制器中所有方法之间的共享 Map。

默认情况下,目标 Map 应为 Ant 样式的路径模式,例如“/foo *”,“/foo/**”。这些模式包括对模板变量的支持,例如“/foo /{id}”,可以使用@DestinationVariable个方法参数来引用。

Tip

应用程序可以选择切换到以点分隔的目标约定。参见第 26.4.10 节,“点分隔符”

@MessageMapping方法可以具有带有以下参数的灵活签名:

Method argumentDescription
Message用于访问完整的消息。
MessageHeaders用于访问Message中的标题。
MessageHeaderAccessor , SimpMessageHeaderAccessor , StompHeaderAccessor用于通过类型化访问器方法访问 Headers。
@Payload为了访问消息的有效负载,通过已配置的MessageConverter进行了转换(例如,从 JSON 转换)。


不需要此注解,因为如果没有其他参数匹配,则默认情况下会使用此 注解。
有效载荷参数可以用@javax.validation.Valid或 Spring 的@Validated进行注解,以便自动验证。
| @Header |必要时用于访问特定的 Headers 值以及使用org.springframework.core.convert.converter.Converter进行类型转换。
| @Headers |用于访问消息中的所有标题。此自变量必须可分配给java.util.Map
| @DestinationVariable |用于访问从消息目标提取的模板变量。值将根据需要转换为声明的方法参数类型。
| java.security.Principal |反映在 WebSocket HTTP 握手时登录的用户。

@MessageMapping方法返回值时,默认情况下,该值会通过已配置的MessageConverter序列化为有效负载,然后以Message的形式发送到"brokerChannel",并从那里 Broadcast 给订户。出站消息的目的地与入站消息的目的地相同,但前缀为"/topic"

您可以使用@SendTo方法注解自定义将有效负载发送到的目标。 @SendTo也可以在类级别使用,以共享默认的目标目的地以发送消息。 @SendToUser是仅向与消息关联的用户发送消息的变体。有关详情,请参见第 26.4.13 节“用户目标”

@MessageMapping方法的返回值可以用ListenableFutureCompletableFutureCompletionStage包装,以便异步产生有效负载。

作为从@MessageMapping方法返回有效载荷的替代方法,您还可以使用SimpMessagingTemplate发送消息,这也是在后台处理返回值的方式。参见第 26.4.6 节“发送消息”

@SubscribeMapping

@SubscribeMapping注解与@MessageMapping结合使用,以缩小 Map 到订阅消息的范围。在这种情况下,@MessageMapping注解指定目的地,而@SubscribeMapping表示仅对订阅消息感兴趣。

就 Map 和 Importing 参数而言,@SubscribeMapping方法通常与任何@MessageMapping方法没有区别。例如,您可以将其与类型级别的@MessageMapping组合以表示共享的目标前缀,并且可以将method arguments与任何@ MessageMapping`方法一起使用。

@SubscribeMapping的主要区别在于,该方法的返回值被序列化为有效负载,并发送到“ brokerChannel”而不是“ clientOutboundChannel”,从而有效地直接回复 Client 端,而不是通过代理进行 Broadcast。这对于实现一次性的请求-响应消息交换非常有用,并且从不保留订阅。这种模式的常见情况是必须加载和显示数据时进行应用程序初始化。

@SubscribeMapping方法也可以用@SendTo注解,在这种情况下,返回值将与明确指定的目标目的地一起发送到"brokerChannel"

@MessageExceptionHandler

应用程序可以使用@MessageExceptionHandler个方法来处理@MessageMapping个方法中的异常。感兴趣的异常可以在注解本身中声明,或者如果要访问异常实例,则可以通过方法参数声明:

@Controller
public class MyController {

    // ...

    @MessageExceptionHandler
    public ApplicationError handleException(MyException exception) {
        // ...
        return appError;
    }
}

@MessageExceptionHandler方法支持灵活的方法签名,并支持与@MessageMapping方法相同的方法参数类型和返回值。

通常,@MessageExceptionHandler方法适用于在@Controller类(或类层次结构)中声明的方法。如果希望此类方法在各个控制器之间更全局地应用,则可以在标有@ControllerAdvice的类中声明它们。这相当于 Spring MVC 中的similar support

26.4.6 发送消息

如果要从应用程序的任何部分向连接的 Client 端发送消息怎么办?任何应用程序组件都可以将消息发送到"brokerChannel"。最简单的方法是注入SimpMessagingTemplate,然后使用它发送消息。通常,按类型插入它应该很容易,例如:

@Controller
public class GreetingController {

    private SimpMessagingTemplate template;

    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
        this.template = template;
    }

    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }

}

但是,如果存在另一个相同类型的 bean,也可以使用其名称“ brokerMessagingTemplate”对其进行限定。

26.4.7 简单 broker

内置的简单消息代理处理来自 Client 端的订阅请求,将其存储在内存中,并将消息 Broadcast 到具有匹配目标的已连接 Client 端。该代理支持类似路径的目标,包括对 Ant 样式目标模式的订阅。

Note

应用程序还可以使用点分隔的目标(相对于斜杠)。参见第 26.4.10 节,“点分隔符”

26.4.8 外部 broker

简单代理非常适合入门,但仅支持 STOMP 命令的子集(例如,无门禁,收据等),依赖简单的消息发送循环,并且不适合集群。作为替代方案,应用程序可以升级为使用功能全面的消息代理。

查看 STOMP 文档以查找您选择的消息代理(例如RabbitMQActiveMQ等),安装代理,并在启用 STOMP 支持的情况下运行它。然后在 Spring 配置中启用 STOMP 代理中继,而不是简单代理。

以下是启用全功能代理的示例配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

XML 配置等效项:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

</beans>

上面配置中的“ STOMP 代理中继”是 Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。为此,它构建到代理的 TCP 连接,将所有消息转发给它,然后通过它们的 WebSocket 会话将从代理收到的所有消息转发给 Client 端。从本质上讲,它充当“转发器”,可以双向转发消息。

Note

Spring 使用org.projectreactor:reactor-netio.netty:netty-all来 Management 与代理的 TCP 连接,这两者都需要作为项目依赖项添加。

Spring Framework 4.3.x 中的 STOMP 代理支持与 Reactor 的 2.0.x 代兼容。因此,它与需要 Reactor 3.x 的spring-cloud-stream-reactive模块结合使用时不受支持。

Spring Framework 5 依赖于具有独立版本控制的 Reactor 3 和 Reactor Netty,用于与 STOMP 代理的 TCP 连接,还为反应式编程模型提供广泛的支持。

此外,应用程序组件(例如 HTTP 请求处理方法,业务服务等)也可以将消息发送到代理中继,如第 26.4.6 节“发送消息”中所述,以便将消息 Broadcast 到订阅的 WebSocketClient 端。

实际上,代理中继可实现健壮且可伸缩的消息 Broadcast。

26.4.9 连接到代理

STOMP 代理中继器维护与代理的单个“系统” TCP 连接。此连接仅用于源自服务器端应用程序的消息,而不用于接收消息。您可以为此连接配置 STOMP 凭据,即 STOMP 帧loginpasscodeHeaders。这在 XML 名称空间和 Java 配置中都以systemLogin/systemPasscode属性(默认值为guest/guest)公开。

STOMP 代理中继还为每个连接的 WebSocketClient 端创建一个单独的 TCP 连接。您可以配置 STOMP 凭据以用于代表 Client 端创建的所有 TCP 连接。这在 XML 名称空间和 Java 配置中都公开为clientLogin/clientPasscode属性,默认值为guest/guest

Note

STOMP 代理中继始终在代表 Client 端转发给代理的每个CONNECT帧上设置loginpasscodeHeaders。因此,WebSocketClient 端无需设置这些 Headers。他们将被忽略。如第 26.4.11 节,“身份验证”所述,WebSocketClient 端应该依靠 HTTP 身份验证来保护 WebSocket 端点并构建 Client 端身份。

STOMP 代理中继还通过“系统” TCP 连接向消息代理发送和从消息代理接收心跳。您可以配置发送和接收心跳的间隔(默认情况下,每个间隔为 10 秒)。如果与代理的连接断开,则代理中继将每 5 秒 continue 尝试重新连接,直到成功。

任何与代理之间的“系统”连接丢失并重新构建时,任何 Spring bean 都可以实现ApplicationListener<BrokerAvailabilityEvent>以便接收通知。例如,当没有活动的“系统”连接时,Broadcast 股票报价的股票报价服务可以停止尝试发送消息。

默认情况下,STOMP 代理中继始终连接到同一主机和端口,如果连接断开,则根据需要重新连接。如果希望提供多个地址,则在每次尝试连接时,都可以配置地址供应商,而不是固定的主机和端口。例如:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

	// ...

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
		registry.setApplicationDestinationPrefixes("/app");
	}

	private Reactor2TcpClient<byte[]> createTcpClient() {

		Supplier<InetSocketAddress> addressSupplier = new Supplier<InetSocketAddress>() {
			@Override
			public InetSocketAddress get() {
				// Select address to connect to ...
			}
		};

		StompDecoder decoder = new StompDecoder();
		Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
		return new Reactor2TcpClient<>(addressSupplier, codec);
	}

}

STOMP 代理中继也可以配置为virtualHost属性。此属性的值将设置为每个CONNECT帧的hostHeaders,并且例如在云环境中很有用,在该云环境中构建 TCP 连接的实际主机与提供基于云的 STOMP 服务的主机不同。

26.4.10 点作为分隔符

将邮件路由到@MessageMapping方法时,它们与AntPathMatcher匹配,并且默认情况下,模式应使用斜杠“ /”作为分隔符。这是 Web 应用程序中的一个良好约定,类似于 HTTP URL。但是,如果您更习惯于消息传递约定,则可以切换到使用点“”。作为分隔符。

在 Java 配置中:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    // ...

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.enableStompBrokerRelay("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

In XML:

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:websocket="http://www.springframework.org/schema/websocket"
        xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/websocket
                http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
        <websocket:stomp-endpoint path="/stomp"/>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

    
    <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
        <constructor-arg index="0" value="."/>
    </bean>
    

</beans>

之后,控制器可以使用点“。”作为@MessageMapping方法中的分隔符:

@Controller
@MessageMapping("foo")
public class FooController {

    @MessageMapping("bar.{baz}")
    public void handleBaz(@DestinationVariable String baz) {
        // ...
    }
}

Client 端现在可以向"/app/foo.bar.baz123"发送消息。

在上面的示例中,我们没有更改“代理中继”上的前缀,因为这些前缀完全取决于外部消息代理。查看您所使用的代理的 STOMP 文档页面,以了解其对目标 Headers 支持的约定。

另一方面,“简单代理”确实依赖于已配置的PathMatcher,因此,如果切换也将应用于代理的分隔符,并且将目的地从消息匹配到订阅中的模式,则该分隔符将适用于代理。

26.4.11 Authentication

每个通过 WebSocket 进行的 STOMP 消息传递会话均以 HTTP 请求开始,HTTP 请求可以是升级到 WebSocket 的请求(即 WebSocket 握手),如果是 SockJS 后备,则可以是一系列 SockJS HTTP 传输请求。

Web 应用程序已经具有身份验证和授权来保护 HTTP 请求。通常,使用某种机制(例如登录页面,HTTP 基本认证或其他)通过 Spring Security 对用户进行认证。经过身份验证的用户的安全上下文保存在 HTTP 会话中,并与同一基于 cookie 的会话中的后续请求关联。

因此,对于 WebSocket 握手或 SockJS HTTP 传输请求,通常已经有一个可以通过HttpServletRequest#getUserPrincipal()访问的经过身份验证的用户。 Spring 会自动将该用户与为其创建的 WebSocket 或 SockJS 会话相关联,并随后与通过该用户 Headers 在该会话上传输的所有 STOMP 消息相关联。

简而言之,典型的 Web 应用程序除了已经具有的安全性之外,不需要做任何特别的事情。通过基于 cookie 的 HTTP 会话维护的安全上下文在 HTTP 请求级别对用户进行身份验证,然后将其与为该用户创建的 WebSocket 或 SockJS 会话相关联,并在通过应用程序的每个Message上标记一个用户 Headers。

请注意,STOMP 协议在CONNECT帧上确实具有“登录”和“密码”Headers。这些最初是设计用于并且仍然需要的,例如,基于 TCP 的 STOMP。但是,对于默认情况下,基于 WebSocket 的 STOMP,Spring 会忽略 STOMP 协议级别的授权 Headers,并假定该用户已经在 HTTP 传输级别进行了身份验证,并期望 WebSocket 或 SockJS 会话包含经过身份验证的用户。

Note

Spring Security 提供了WebSocket 子协议授权,该WebSocket 子协议授权使用ChannelInterceptor来基于消息中的用户 Headers 授权消息。此外,Spring Session 还提供WebSocket integration来确保 WebSocket 会话仍处于活动状态时用户 HTTP 会话不会过期。

26.4.12 令牌认证

Spring Security OAuth支持基于令牌的安全性,包括 JSON Web 令牌(JWT)。如上一节所述,它可以用作 Web 应用程序中的身份验证机制,包括通过 WebSocket 进行 STOMP 交互,即通过基于 cookie 的会话维护身份。

同时,基于 cookie 的会话并不总是最合适的,例如,在根本不希望维护服务器端会话的应用程序中或在通常使用 Headers 进行身份验证的移动应用程序中。

WebSocket 协议 RFC 6455“没有规定服务器在 WebSocket 握手期间可以对 Client 端进行身份验证的任何特定方式。”但是实际上,浏览器 Client 端只能使用标准身份验证 Headers(即基本 HTTP 身份验证)或 cookie,而不能提供例如自定义 Headers。同样,SockJS JavaScriptClient 端也不提供通过 SockJS 传输请求发送 HTTPHeaders 的方法,请参阅sockjs-Client 端问题 196。相反,它确实允许发送查询参数,该参数可用于发送令牌,但有其自身的缺点,例如,令牌可能会无意中与服务器日志中的 URL 一起记录。

Note

以上限制适用于基于浏览器的 Client 端,不适用于基于 Spring Java 的 STOMPClient 端,该 Client 端支持通过 WebSocket 和 SockJS 请求发送 Headers。

因此,希望避免使用 cookie 的应用程序可能在 HTTP 协议级别上没有任何好的身份验证替代方案。他们可能不喜欢使用 Cookie,而是更喜欢在 STOMP 消息传递协议级别使用 Headers 进行身份验证,可以通过以下两个简单步骤进行操作:

  • 使用 STOMPClient 端在连接时传递身份验证 Headers。

  • 使用ChannelInterceptor处理身份验证 Headers。

下面是注册自定义身份验证拦截器的示例服务器端配置。请注意,拦截器仅需要认证并在 CONNECT Message上设置用户 Headers。 Spring 将记录并保存经过身份验证的用户,并将其与同一会话上的后续 STOMP 消息相关联:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.setInterceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    Authentication user = ... ; // access authentication header(s)
                    accessor.setUser(user);
                }
                return message;
            }
        });
    }
}

还要注意,在使用 Spring Security 的消息授权时,目前,您需要确保在 Spring Security 之前 Order 身份验证ChannelInterceptor config。最好通过在其自己的带有@Order(Ordered.HIGHEST_PRECEDENCE + 99)标记的AbstractWebSocketMessageBrokerConfigurer子类中声明自定义拦截器来完成此操作。

26.4.13 用户目的地

应用程序可以发送针对特定用户的消息,Spring 的 STOMP 支持为此识别出带有"/user/"前缀的目的地。例如,Client 端可能订阅了目的地"/user/queue/position-updates"。该目的地将由UserDestinationMessageHandler处理,并转换为用户会话唯一的目的地,例如"/queue/position-updates-user123"。这提供了订阅通用命名目的地的便利,同时确保与订阅同一目的地的其他用户不发生冲突,以便每个用户都可以接收唯一的库存头寸更新。

在发送方,可以将消息发送到诸如"/user/{username}/queue/position-updates"之类的目的地,而该目的地又将由UserDestinationMessageHandler转换为一个或多个目的地,每个与用户相关联的会话都将一个目的地。这允许应用程序内的任何组件发送针对特定用户的消息,而不必知道他们的姓名和通用目的地。注解和消息传递模板也支持此功能。

例如,一种消息处理方法可以通过@SendToUser注解将消息发送给与正在处理的消息相关联的用户(在类级别上也支持共享公共目的地):

@Controller
public class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

如果用户有多个会话,则默认情况下,所有订阅给定目标的会话都是目标。但是有时候,可能有必要仅将发送正在处理的消息的会话作为目标。可以通过将broadcast属性设置为 false 来完成,例如:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // raise MyBusinessException here
    }

    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}

Note

虽然用户目的地通常意味着已通过身份验证的用户,但并非严格要求。不与已认证用户关联的 WebSocket 会话可以订阅用户目的地。在这种情况下,@SendToUser注解的行为与broadcast=false完全相同,即仅针对发送正在处理的消息的会话。

例如,还可以通过注入由 Java 配置或 XML 名称空间创建的SimpMessagingTemplate从任何应用程序组件向用户目标发送消息(如果需要使用@Qualifier进行限定,则 Bean 名称为"brokerMessagingTemplate"):

@Service
public class TradeServiceImpl implements TradeService {

	private final SimpMessagingTemplate messagingTemplate;

	@Autowired
	public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
		this.messagingTemplate = messagingTemplate;
	}

	// ...

	public void afterTradeExecuted(Trade trade) {
		this.messagingTemplate.convertAndSendToUser(
				trade.getUserName(), "/queue/position-updates", trade.getResult());
	}
}

Note

将用户目标与外部消息代理一起使用时,请查看代理文档以了解如何 Management 非活动队列,以便在用户会话结束时,将删除所有唯一的用户队列。例如,当使用诸如/exchange/amq.direct/position-updates之类的目的地时,RabbitMQ 会创建自动删除队列。因此,在这种情况下,Client 端可以订阅/user/exchange/amq.direct/position-updates。同样,ActiveMQ 具有configuration options用于清除非活动目标。

在多应用程序服务器方案中,由于用户连接到其他服务器,因此用户目的地可能无法解析。在这种情况下,您可以将目标配置为向其 Broadcast 未解析的消息,以便其他服务器可以尝试。这可以通过 Java 配置中MessageBrokerRegistryuserDestinationBroadcast属性和 XML 中message-broker元素的user-destination-broadcast属性来完成。

26.4.14 事件和拦截

几个ApplicationContext事件(下面列出)已发布,可以通过实现 Spring 的ApplicationListener接口来接收。

  • BrokerAvailabilityEvent-指示代理何时可用/不可用。当“简单”代理在启动时立即可用并保持运行状态时,STOMP“代理中继”可能会失去与全功能代理的连接,例如,如果代理重新启动。代理中继具有重新连接逻辑,并且在返回时将重新构建与代理的“系统”连接,因此,只要状态从已连接变为断开,反之亦然,将发布此事件。使用SimpMessagingTemplate的组件应订阅此事件,并避免在代理不可用时避免发送消息。无论如何,他们应该准备在发送消息时处理MessageDeliveryException

  • SessionConnectEvent —在接收到新的 STOMP CONNECT 时表示已开始新的 Client 端会话,发布。该事件包含表示连接的消息,包括会话 ID,用户信息(如果有)以及 Client 端可能已发送的任何自定义 Headers。这对于跟踪 Client 端会话很有用。预订此事件的组件可以使用SimpMessageHeaderAccessorStompMessageHeaderAccessor包装包含的消息。

  • SessionConnectedEvent —在代理响应 CONNECT 发送 STOMP CONNECTED 帧后,在SessionConnectEvent之后不久发布。此时,可以认为 STOMP 会话已完全构建。

  • SessionSubscribeEvent —在收到新的 STOMP SUBSCRIBE 时发布。

  • SessionUnsubscribeEvent —在收到新的 STOMP UNSUBSCRIBE 时发布。

  • SessionDisconnectEvent —在 STOMP 会话结束时发布。 DISCONNECT 可能已经从 Client 端发送,或者在关闭 WebSocket 会话时也可能自动生成。在某些情况下,每个会话可能多次发布此事件。组件应与多个断开事件有关。

Note

使用功能齐全的代理时,如果代理暂时不可用,STOMP“代理中继”会自动重新连接“系统”连接。但是,Client 端连接不会自动重新连接。假设启用了心跳,Client 端通常会注意到代理在 10 秒内没有响应。Client 端需要实现自己的重新连接逻辑。

以上事件反映了 STOMP 连接生命周期中的点。它们并不是要为 Client 端发送的每条消息提供通知。相反,应用程序可以注册ChannelInterceptor来拦截每个传入和传出的 STOMP 消息。例如,拦截入站消息:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.setInterceptors(new MyChannelInterceptor());
    }
}

定制ChannelInterceptor可以扩展空方法 Base ClassChannelInterceptorAdapter,并使用StompHeaderAccessorSimpMessageHeaderAccessor访问有关消息的信息。

public class MyChannelInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getStompCommand();
        // ...
        return message;
    }
}

请注意,就像上面的SesionDisconnectEvent一样,DISCONNECT 消息可能是从 Client 端发送的,或者在关闭 WebSocket 会话时也可能自动生成。在某些情况下,拦截器可能在每个会话中多次拦截此消息。组件应与多个断开事件有关。

26.4.15 STOMPClient 端

Spring 提供了一个基于 WebSocket 的 STOMPClient 端和一个基于 TCP 的 STOMPClient 端。

开始创建和配置WebSocketStompClient

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上面的示例中,StandardWebSocketClient可以替换为SockJsClient,因为它也是WebSocketClient的实现。 SockJsClient可以使用 WebSocket 或基于 HTTP 的传输作为后备。有关更多详细信息,请参见第 26.3.7 节“ SockJsClient”

接下来构建连接并为 STOMP 会话提供处理程序:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备好使用时,将通知处理程序:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

构建会话后,可以发送任何有效负载,并将其与配置的MessageConverter进行序列化:

session.send("/topic/foo", "payload");

您还可以订阅目的地。 subscribe方法需要用于订阅消息的处理程序,并返回可用于取消订阅的Subscription处理。对于每个收到的消息,处理程序可以指定目标对象类型,有效负载应反序列化为:

session.subscribe("/topic/foo", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,请为WebSocketStompClient配置TaskScheduler并自定义心跳间隔,写不活动导致发送心跳的时间为 10 秒,读不活动导致连接的时间为 10 秒。

Note

当使用WebSocketStompClient进行性能测试以模拟同一台计算机上的数千个 Client 端时,请考虑关闭心跳 signal,因为每个连接都调度自己的心跳任务,并且尚未针对同一台计算机上运行的大量 Client 端进行优化。

STOMP 协议还支持回执,在该回执中,Client 端必须添加一个“ receipt”Headers,服务器在处理完发送或订阅后将以 RECEIPT 帧响应该 Headers。为此,StompSession提供setAutoReceipt(boolean)会导致在每个后续发送或订阅中添加一个“ receipt”Headers。或者,您也可以手动将_receiptHeaders 添加到StompHeaders。发送和订阅都返回Receiptable的实例,该实例可用于注册接收成功和失败的回调。要使用此功能,必须为 Client 端配置TaskScheduler以及收据过期之前的时间(默认为 15 秒)。

请注意,StompSessionHandler本身是StompFrameHandler,除了handleException回调(用于处理消息的异常)和handleTransportError(用于传输级别的错误,包括ConnectionLostException)之外,它还可以处理 ERROR 帧。

26.4.16 WebSocket 范围

每个 WebSocket 会话都有一个属性 Map。该 Map 作为 Headers 附加到入站 Client 端消息,并且可以从控制器方法进行访问,例如:

@Controller
public class MyController {

	@MessageMapping("/action")
	public void handle(SimpMessageHeaderAccessor headerAccessor) {
		Map<String, Object> attrs = headerAccessor.getSessionAttributes();
		// ...
	}
}

也可以在websocket范围内声明一个 SpringManagement 的 bean。可以将 WebSocket 范围的 bean 注入到控制器以及在“ clientInboundChannel”上注册的任何通道拦截器中。这些通常是单例,并且比任何单独的 WebSocket 会话寿命更长。因此,您将需要对 WebSocket 范围的 bean 使用作用域代理模式:

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {

    @PostConstruct
    public void init() {
        // Invoked after dependencies injected
    }

    // ...

    @PreDestroy
    public void destroy() {
        // Invoked when the WebSocket session ends
    }
}

@Controller
public class MyController {

    private final MyBean myBean;

    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }

    @MessageMapping("/action")
    public void handle() {
        // this.myBean from the current WebSocket session
    }
}

与任何自定义范围一样,Spring 首次从控制器访问它时会初始化一个新的MyBean实例,并将该实例存储在 WebSocket 会话属性中。随后将返回相同的实例,直到会话结束。 WebSocket 范围的 bean 将具有所有 Spring 生命周期方法,如上面的示例所示。

26.4.17 Performance

关于性能,没有万灵药。许多因素可能会影响它,包括消息的大小,数量,应用程序方法是否执行需要阻塞的工作以及诸如网络速度之类的外部因素。本部分的目的是提供可用配置选项的概述,以及有关如何进行扩展的一些想法。

在消息传递应用程序中,消息通过通道传递,以进行线程池支持的异步执行。配置这样的应用程序需要对通道和消息流有充分的了解。因此,建议查看第 26.4.4 节“消息流”

最明显的开始是配置支持"clientInboundChannel""clientOutboundChannel"的线程池。默认情况下,两者都配置为可用处理器数量的两倍。

如果注解方法中的消息处理主要受 CPU 限制,则"clientInboundChannel"的线程数应保持与处理器数接近。如果他们所做的工作受到更多 IO 的限制,并且需要阻塞或 await 数据库或其他外部系统,则将需要增加线程池的大小。

Note

ThreadPoolExecutor具有 3 个重要属性。这些是核心和最大线程池大小,以及队列存储没有可用线程的任务的容量。

常见的混淆点是配置核心池大小(例如 10)和最大池大小(例如 20)会导致线程池具有 10 到 20 个线程。实际上,如果将容量保留为其默认值 Integer.MAX_VALUE,则线程池将永远不会增加到超出核心池大小的范围,因为所有其他任务都将排队。

请查看ThreadPoolExecutor的 Javadoc,以了解这些属性的工作原理并了解各种排队策略。

"clientOutboundChannel"方面,都是关于向 WebSocketClient 端发送消息。如果 Client 端在快速网络上,则线程数应保持接近可用处理器数。如果它们很慢或带宽很低,它们将花费更长的时间来消耗消息并给线程池增加负担。因此,必须增加线程池的大小。

尽管“ clientInboundChannel”的工作量可以预测-毕竟这是基于应用程序的工作-但是配置“ clientOutboundChannel”的方法比较困难,因为它基于应用程序无法控制的因素。因此,有两个与消息发送有关的其他属性。这些是"sendTimeLimit""sendBufferSizeLimit"。这些用于配置发送消息到 Client 端时允许发送多长时间以及可以缓冲多少数据。

一般的想法是,在任何给定的时间,只有一个线程可以用于发送给 Client 端。同时,所有其他消息都将被缓冲,您可以使用这些属性来决定允许发送消息花费多长时间以及在此期间可以缓冲多少数据。请查看该配置的 Javadoc 和 XML 模式的文档,以获取重要的其他详细信息。

这是示例配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

上面显示的 WebSocket 传输配置还可以用于配置传入 STOMP 消息的最大允许大小。尽管从理论上讲 WebSocket 消息的大小几乎是无限的,但实际上 WebSocket 服务器会施加限制-例如,Tomcat 上为 8K,Jetty 上为 64K。因此,诸如 stomp.js 之类的 STOMPClient 端会在 16K 边界处拆分较大的 STOMP 消息,并将其作为多个 WebSocket 消息发送,因此需要服务器进行缓冲和重新组装。

Spring 的 STOMP over WebSocket 支持可以做到这一点,因此应用程序可以为 STOMP 消息配置最大大小,而与 WebSocket 服务器特定的消息大小无关。请记住,如有必要,将自动调整 WebSocket 消息的大小,以确保它们最多可以承载 16K WebSocket 消息。

这是示例配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

关于扩展的重要一点是使用多个应用程序实例。目前,使用简单代理程序是不可能的。但是,当使用功能全面的代理(例如 RabbitMQ)时,每个应用程序实例都连接到代理,并且从一个应用程序实例 Broadcast 的消息可以通过代理 Broadcast 到通过任何其他应用程序实例连接的 WebSocketClient 端。

26.4.18 Monitoring

使用@EnableWebSocketMessageBroker<websocket:message-broker>关键基础结构组件时,会自动收集统计信息和计数器,以提供对应用程序内部状态的重要了解。该配置还声明了一个类型为WebSocketMessageBrokerStats的 bean,该 bean 会将所有可用信息收集到一个位置,并且默认情况下每 30 分钟在INFO级别记录一次。该 bean 可以通过 Spring 的MBeanExporter导出到 JMX 以便在运行时查看,例如通过 JDK 的jconsole。以下是可用信息的摘要。

  • Client 端 WebSocket 会话

      • Current

      • 指示当前有多少个 Client 端会话,并且通过 WebSocket vs HTTP 流和轮询 SockJS 会话进一步细分该会话。

    • Total

      • 指示已构建的会话总数。
    • Abnormally Closed

        • Connect Failures

        • 这些是已构建但在 60 秒内未收到任何消息后已关闭的会话。这通常表示代理或网络问题。

      • 超过发送限制

        • 超过配置的发送超时或发送缓冲区限制(慢速 Client 端可能发生)后,会话将关闭(请参阅上一节)。
      • Transport Errors

        • 传输错误(例如无法读取或写入 WebSocket 连接或 HTTP 请求/响应)后,会话关闭。
    • STOMP Frames

      • 已处理的 CONNECT,CONNECTED 和 DISCONNECT 帧的总数,指示在 STOMP 级别上连接了多少个 Client 端。请注意,当会话异常关闭或 Client 端未发送 DISCONNECT 帧而关闭时,DISCONNECT 计数可能会降低。
  • STOMPbroker 接力

      • TCP Connections

      • 指示到代理构建了代表 Client 端 WebSocket 会话的 TCP 连接数。这应该等于 Client 端 WebSocket 会话的数量 1 个用于从应用程序内部发送消息的附加共享“系统”连接。

    • STOMP Frames

      • 代表 Client 转发给代理或从代理接收的 CONNECT,CONNECTED 和 DISCONNECT 帧的总数。请注意,无论 Client 端 WebSocket 会话如何关闭,DISCONNECT 帧都会发送到代理。因此,较低的 DISCONNECT 帧计数表示代理正在主动关闭连接,这可能是由于未及时到达的心跳,无效的 Importing 帧或其他原因所致。
  • Client 入站通道

    • 支持“ clientInboundChannel”的线程池统计信息提供对传入消息处理运行状况的深入了解。此处排队的任务表明应用程序可能太慢而无法处理消息。如果存在 I/O 绑定的任务(例如,缓慢的数据库查询,对第三方 REST API 的 HTTP 请求等),请考虑增加线程池大小。
  • Client 出站通道

    • 支持“ clientOutboundChannel”的线程池中的统计信息提供了对向 Client 端 Broadcast 消息的运行状况的深入了解。此处排队的任务表明 Client 端太慢而无法使用消息。解决此问题的一种方法是增加线程池大小,以容纳预期的并发慢速 Client 端数。另一个选择是减少发送超时和发送缓冲区大小限制(请参阅上一节)。
  • SockJS 任务计划程序

    • SockJS 任务调度程序的线程池中的统计信息,用于发送心跳。请注意,如果在 STOMP 级别协商了心跳,则会禁用 SockJS 心跳。

26.4.19 Testing

使用基于 WebSocket 的 Spring 的 STOMP 支持两种测试应用程序的主要方法。首先是编写服务器端测试,以验证控制器的功能及其带 注解的消息处理方法。第二种是编写涉及运行 Client 端和服务器的完整的端到端测试。

两种方法不是互斥的。相反,每个人在整体测试策略中都占有一席之地。服务器端测试更加集中,更易于编写和维护。另一方面,端到端集成测试更完整,测试更多,但它们也涉及编写和维护。

服务器端测试的最简单形式是编写控制器单元测试。但是,这并不是足够有用,因为控制器的大部分工作都取决于其注解。纯单元测试根本无法测试。

理想情况下,应像在运行时一样调用被测控制器,就像使用 Spring MVC Test 框架测试处理 HTTP 请求的控制器的方法一样。即不运行 Servlet 容器而是依靠 Spring 框架来调用带注解的控制器。就像这里的 Spring MVC Test 一样,有两种可能的选择,或者使用“基于上下文的”或“独立的”设置:

  • 在 Spring TestContext 框架的帮助下加载实际的 Spring 配置,将“ clientInboundChannel”注入为测试字段,并使用它发送消息以由控制器方法处理。

  • 手动设置调用控制器(即SimpAnnotationMethodMessageHandler)并将控制器消息直接传递给它所需的最低 Spring 框架基础结构。

股票投资组合测试示例应用程序中演示了这两种设置方案。

第二种方法是创建端到端集成测试。为此,您将需要以嵌入式模式运行 WebSocket 服务器,并作为 WebSocketClient 端连接到该服务器,以发送包含 STOMP 帧的 WebSocket 消息。 股票投资组合测试示例应用程序还演示了如何使用 Tomcat 作为嵌入式 WebSocket 服务器和用于测试目的的简单 STOMPClient 端。