Spring Framework 中文文档

4.3.21.RELEASE

26. WebSocket 支持

reference 文档的这一部分涵盖了 Spring Framework 对 web applications 中 WebSocket-style 消息的支持,包括使用 STOMP 作为 application level WebSocket sub-protocol。

第 26.1 节,“简介”建立了思考 WebSocket 的思维框架,涵盖了采用挑战,设计考虑因素以及何时适合它们的想法。

第 26.2 节,“WebSocket API”审查 server-side 上的 Spring WebSocket API,而第 26.3 节,“SockJS 后备”解释了 SockJS 协议并说明了如何配置和使用它。

第 26.4.1 节,“概述”引入了 STOMP 消息传递协议。 第 26.4.3 节,“启用 STOMP”演示了如何在 Spring 中配置 STOMP 支持。 第 26.4.5 节,“带注释的控制器”以及以下部分解释了如何编写带注释的消息处理方法,发送消息,选择消息 broker 选项,以及如何使用特殊的“用户”目标。最后,第 26.4.19 节,“测试” lists 三种方法来测试 STOMP/WebSocket applications。

26.1 简介

WebSocket 协议RFC 6455为 web applications 定义了一个重要的新功能:full-duplex,two-way client 和服务器之间的通信。这是一个激动人心的新功能,紧随着 web 技术的历史,使 web 更具互动性,包括 Java Applets,XMLHttpRequest,Adobe Flash,ActiveXObject,各种 Comet 技术,server-sent events 等。

WebSocket 协议的正确介绍超出了本文档的范围。但至少重要的是要理解 HTTP 仅用于初始握手,它依赖于 HTTP 内置的机制来请求协议升级(或者在这种情况下是协议交换机)服务器可以使用 HTTP 状态响应 101 (切换协议)如果同意的话。假设握手成功,HTTP 升级请求所基于的 TCP socket 保持打开状态,client 和服务器都可以使用它来相互发送消息。

Spring Framework 4 包含一个新的spring-websocket模块,具有全面的 WebSocket 支持。它与 Java WebSocket API 标准(JSR-356)兼容,并且还提供了额外的 value-add,如引言的 rest 中所述。

26.1.1 WebSocket 后备选项

采用的一个重要挑战是在某些浏览器中缺乏对 WebSocket 的支持。值得注意的是,支持 WebSocket 的第一个 Internet Explorer version 是 version 10(请参阅http://caniuse.com/websockets以获得浏览器版本的支持)。此外,某些限制性代理可能会以某种方式进行配置,这些方式要么阻止尝试进行 HTTP 升级,要么在某些 time 之后以其他方式 break 连接,因为它仍然打开了太长__。 InfoQ 文章“HTML5 Web 套接字如何与代理服务器交互”中提供了有关 Peter Lubbers 关于此主题的详细概述。

因此,要今天 build 一个 WebSocket application,order 中需要回退选项来在必要时模拟 WebSocket API。 Spring Framework 提供基于SockJS 协议的透明回退选项。可以通过 configuration 启用这些选项,否则不需要修改 application。

26.1.2 消息传递架构

除了 short-to-midterm 采用挑战之外,使用 WebSocket 提出了重要的设计考虑因素,这些考虑因素在早期就很重要,特别是与我们今天对 building web applications 的了解形成鲜明对比。

今天 REST 是 building web applications 的广泛接受,理解和支持的 architecture。它是一个 architecture,依赖于拥有许多 URL(名词),一些 HTTP 方法(动词)和其他原则,如使用超媒体(链接),保持 stateless 等。

相比之下,WebSocket application 可能仅将单个 URL 用于初始 HTTP 握手。此后,所有消息在同一 TCP 连接上共享和流动。这指向完全不同的异步 event-driven 消息传递 architecture。一个更接近传统消息传递应用程序(e.g. JMS,AMQP)。

Spring Framework 4 包含一个带有来自Spring Integration项目的 key 抽象的新spring-messaging模块,例如MessageMessageChannelMessageHandler和其他可以作为此类消息传递 architecture 的基础的模块。该模块还包括一组 annotations,用于将消息映射到方法,类似于基于 Spring MVC annotation 的编程 model。

26.1.3 Sub-Protocol 支持 WebSocket

WebSocket 确实暗示了消息传递 architecture,但并未强制要求使用任何特定的消息传递协议。它是 TCP 上的一个非常薄的层,它将字节流转换为消息流(文本或二进制),而不是更多。应用程序来解释消息的含义。

与 HTTP(一种 application-level 协议)不同,在 WebSocket 协议中,传入消息中的信息不足以让 framework 或容器知道如何对其进行 route 处理或处理它。因此,除了非常简单的应用程序之外,WebSocket 可以说太低了。它可以完成,但它可能会导致在顶部创建一个 framework。这与今天大多数 web applications 是使用 web framework 而不仅仅是 Servlet API 编写的方式相当。

因此,WebSocket RFC 定义了sub-protocols的使用。在握手期间,client 和服务器可以使用标头Sec-WebSocket-Protocol来同意 sub-protocol,i.e。要使用的更高的 application-level 协议。不需要使用 sub-protocol,但即使没有使用,applications 仍然需要选择 client 和服务器都能理解的消息格式。该格式可以是自定义的,framework-specific 或标准消息传递协议。

Spring Framework 支持使用STOMP - 一种简单的消息传递协议,最初创建用于脚本语言,其框架受 HTTP 启发。 STOMP 得到广泛支持,非常适合在 WebSocket 和 web 上使用。

26.1.4 我应该使用 WebSocket 吗?

考虑到使用 WebSocket 的所有设计考虑因素,可以合理地问“何时使用?”。

最适合 WebSocket 的是 web applications,其中 client 和服务器需要以高频率和低延迟交换 events。主要候选人包括但不限于金融,游戏,合作等应用。这些应用程序对 time 延迟非常敏感,并且还需要以高频率交换各种消息。

但是,对于其他 application 类型,情况可能并非如此。例如,显示突发新闻的新闻或社交 Feed 可能完全可以通过每隔几分钟进行一次简单的轮询。延迟很重要,但如果新闻需要几分钟才会出现,这是可以接受的。

即使在延迟是至关重要的情况下,如果消息量相对较低(e.g. 监视网络故障),long 民意调查的使用应被视为一种相对简单的替代方案,可靠地工作并且在效率方面具有可比性(同样假设音量)消息相对较低)。

它是低延迟和高频率消息的组合,可以使 WebSocket 协议的使用变得至关重要。即使在这样的应用程序中,仍然选择是否应该通过 WebSocket 消息完成所有 client-server 通信,而不是使用 HTTP 和 REST。答案将因 application 而异;但是,很可能某些功能可以通过 WebSocket 和 REST API 在 order 中公开,以便为客户提供替代方案。此外,REST API 调用可能需要 broadcast 消息给通过 WebSocket 连接的感兴趣的客户端。

Spring Framework 允许@Controller@RestController classes 同时具有 HTTP 请求处理和 WebSocket 消息处理方法。此外,Spring MVC 请求处理方法或任何 application 方法可以轻松地向所有感兴趣的 WebSocket 客户端或特定用户广播消息。

26.2 WebSocket API

Spring Framework 提供了一个 WebSocket API,旨在适应各种 WebSocket 引擎。目前,该列表包括 WebSocket 运行时,例如 Tomcat 7.0.47,Jetty 9.1,GlassFish 4.1,WebLogic 12.1.3 和 Undertow 1.0(以及 WildFly 8.0)。随着更多 WebSocket 运行时变得可用,可以添加额外的支持。

正如在介绍中所解释的那样,直接使用 WebSocket API 对于 applications 来说太低了 - 直到对消息的格式做出假设,framework 几乎无法解释消息或通过 annotations 将它们路由出来。这就是为什么 applications 应该考虑使用 sub-protocol 和 Spring 的STOMP over WebSocket支持。

当使用更高的 level 协议时,WebSocket API 的细节变得不那么相关,就像使用 HTTP 时 TCP 通信的细节不会暴露给 applications 一样。然而,本节将介绍直接使用 WebSocket 的详细信息。

26.2.1 WebSocketHandler

创建 WebSocket 服务器就像实现WebSocketHandler或更可能扩展TextWebSocketHandlerBinaryWebSocketHandler一样简单:

import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.TextMessage;

public class MyHandler extends TextWebSocketHandler {

    @Override
    public void handleTextMessage(WebSocketSession session, TextMessage message) {
        // ...
    }

}

有专门的 WebSocket Java-config 和 XML 命名空间支持,用于将上述 WebSocket 处理程序映射到特定的 URL:

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

XML configuration 等效:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

以上内容适用于 Spring MVC applications,应该包含在DispatcherServlet的 configuration 中。但是,Spring 的 WebSocket 支持不依赖于 Spring MVC。在WebSocketHttpRequestHandler的帮助下将WebSocketHandler集成到其他 HTTP 服务环境中相对简单。

26.2.2 WebSocket 握手

自定义初始 HTTP WebSocket 握手请求的最简单方法是通过HandshakeInterceptor,它将握手方法“之前”和“之后”。这样的拦截器可用于排除握手或使WebSocketSession可用的任何属性。对于 example,有一个 built-in 拦截器,用于将 HTTP session 属性传递给 WebSocket session:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(new MyHandler(), "/myHandler")
            .addInterceptors(new HttpSessionHandshakeInterceptor());
    }

}

和 XML configuration 相当:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
        <websocket:handshake-interceptors>
            <bean class="org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor"/>
        </websocket:handshake-interceptors>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

更高级的选项是扩展执行 WebSocket 握手步骤的DefaultHandshakeHandler,包括验证 client 原点,协商 sub-protocol 等。如果需要在 order 中配置自定义RequestUpgradeStrategy以适应 WebSocket 服务器引擎和尚未支持的 version,则 application 可能还需要使用此选项(有关此主题的更多信息,请参阅第 26.2.4 节,“部署”)。 Java-config 和 XML 命名空间都可以配置自定义HandshakeHandler

26.2.3 WebSocketHandler 装饰

Spring 提供了一个WebSocketHandlerDecorator base class,可以用来装饰带有附加行为的WebSocketHandler。使用 WebSocket Java-config 或 XML 命名空间时,默认情况下会提供并添加 Logging 和 exception 处理 implementations。 ExceptionWebSocketHandlerDecorator捕获由任何 WebSocketHandler 方法引起的所有未捕获的 exceptions,并关闭指示服务器错误的状态1011的 WebSocket session。

26.2.4 部署

Spring WebSocket API 易于集成到 Spring MVC application 中,其中DispatcherServlet既可以提供 HTTP WebSocket 握手,也可以提供其他 HTTP 请求。通过调用WebSocketHttpRequestHandler也可以轻松地集成到其他 HTTP 处理场景中。这很方便易懂。但是,有关 JSR-356 运行时的特殊注意事项。

Java WebSocket API(JSR-356)提供了两种部署机制。第一个涉及启动时的 Servlet 容器 classpath 扫描(Servlet 3 feature);另一个是在 Servlet 容器初始化时使用的注册 API。这些机制都不能使用单个“前端控制器”进行所有 HTTP 处理 - 包括 WebSocket 握手和所有其他 HTTP 请求 - 例如 Spring MVC 的DispatcherServlet

这是 JSR-356 的一个重要限制 Spring 的 WebSocket 支持地址,即使在 JSR-356 运行时 running 时也提供 server-specific RequestUpgradeStrategy

已经创建了一个克服 Java WebSocket API 中的上述限制的请求,可以在WEBSOCKET_SPEC-211处遵循。另请注意,Tomcat 和 Jetty 已经提供了本机 API 替代方案,可以轻松克服限制。我们希望更多的服务器将遵循它们的 example,无论它们何时在 Java WebSocket API 中得到解决。

第二个考虑因素是具有 JSR-356 支持的 Servlet 容器应该执行ServletContainerInitializer(SCI)扫描,这可能会减慢 application 启动速度,在某些情况下会显着降低。如果在使用 JSR-356 支持升级到 Servlet 容器 version 后观察到重大影响,则应该可以通过使用web.xml中的<absolute-ordering />元素有选择地启用或禁用 web 片段(和 SCI 扫描):

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://java.sun.com/xml/ns/javaee
        http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">

    <absolute-ordering/>

</web-app>

然后,您可以通过 name 有选择地启用 web 片段,例如 Spring 自己的SpringServletContainerInitializer,如果需要,它提供对 Servlet 3 Java 初始化 API 的支持:

<web-app xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="
        http://java.sun.com/xml/ns/javaee
        http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    version="3.0">

    <absolute-ordering>
        <name>spring_web</name>
    </absolute-ordering>

</web-app>

26.2.5 配置 WebSocket 引擎

每个底层 WebSocket 引擎都公开 configuration properties,它们控制运行时特性,例如消息缓冲区大小,idle 超时等等。

对于 Tomcat,WildFly 和 GlassFish,将ServletServerContainerFactoryBean添加到 WebSocket Java 配置中:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192);
        container.setMaxBinaryMessageBufferSize(8192);
        return container;
    }

}

或 WebSocket XML 命名空间:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <bean class="org.springframework...ServletServerContainerFactoryBean">
        <property name="maxTextMessageBufferSize" value="8192"/>
        <property name="maxBinaryMessageBufferSize" value="8192"/>
    </bean>

</beans>

对于 client 端 WebSocket configuration,您应该使用WebSocketContainerFactoryBean(XML)或ContainerProvider.getWebSocketContainer()(Java 配置)。

对于 Jetty,您需要提供 pre-configured Jetty WebSocketServerFactory并通过 WebSocket Java 配置将其插入 Spring 的DefaultHandshakeHandler

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(echoWebSocketHandler(),
            "/echo").setHandshakeHandler(handshakeHandler());
    }

    @Bean
    public DefaultHandshakeHandler handshakeHandler() {

        WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
        policy.setInputBufferSize(8192);
        policy.setIdleTimeout(600000);

        return new DefaultHandshakeHandler(
                new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
    }

}

或 WebSocket XML 命名空间:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/echo" handler="echoHandler"/>
        <websocket:handshake-handler ref="handshakeHandler"/>
    </websocket:handlers>

    <bean id="handshakeHandler" class="org.springframework...DefaultHandshakeHandler">
        <constructor-arg ref="upgradeStrategy"/>
    </bean>

    <bean id="upgradeStrategy" class="org.springframework...JettyRequestUpgradeStrategy">
        <constructor-arg ref="serverFactory"/>
    </bean>

    <bean id="serverFactory" class="org.eclipse.jetty...WebSocketServerFactory">
        <constructor-arg>
            <bean class="org.eclipse.jetty...WebSocketPolicy">
                <constructor-arg value="SERVER"/>
                <property name="inputBufferSize" value="8092"/>
                <property name="idleTimeout" value="600000"/>
            </bean>
        </constructor-arg>
    </bean>

</beans>

26.2.6 配置允许的来源

从 Spring Framework 4.1.5 开始,WebSocket 和 SockJS 的默认行为是仅接受相同的原始请求。也可以允许所有或指定的起源列表。此检查主要是为浏览器客户端设计的。没有什么可以阻止其他类型的 clients 修改Origin标头 value(有关更多详细信息,请参阅RFC 6454:Web Origin 概念)。

3 种可能的行为是:

  • 仅允许相同的原始请求(默认):在此模式下,启用 SockJS 时,Iframe HTTP 响应头X-Frame-Options设置为SAMEORIGIN,并禁用 JSONP 传输,因为它不允许检查请求的来源。因此,启用此模式时不支持 IE6 和 IE7。

  • 允许指定的原始列表:每个提供的允许来源必须以http://https://开头。在此模式下,启用 SockJS 时,将禁用基于 IFrame 和 JSONP 的传输。因此,启用此模式时,不支持 IE6 到 IE9。

  • 允许所有来源:要启用此模式,您应该提供*作为允许的原始 value。在此模式下,所有传输都可用。

WebSocket 和 SockJS 允许的起源可以配置如下所示:

import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").setAllowedOrigins("http://mydomain.com");
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

XML configuration 等效:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers allowed-origins="http://mydomain.com">
        <websocket:mapping path="/myHandler" handler="myHandler" />
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

26.3 SockJS 后备

介绍中所述,WebSocket 尚未在所有浏览器中受支持,并且可能被限制性网络代理排除。这就是为什么 Spring 提供了基于SockJS 协议(version 0.3.3)尽可能接近地模拟 WebSocket API 的回退选项。

26.3.1 概述

SockJS 的目标是让 applications 使用 WebSocket API,但在运行时必要时回退到 non-WebSocket 替代,i.e。无需更改 application code。

SockJS 包括:

  • SockJS 协议以可执行文件叙述测试的形式定义。

  • SockJS JavaScript client - 用于浏览器的 client library。

  • SockJS 服务器 implementations 包括 Spring Framework spring-websocket模块中的一个。

  • 截至 4.1 spring-websocket还提供了一个 SockJS Java client。

SockJS 专为在浏览器中使用而设计。它竭尽全力使用各种技术支持各种浏览器版本。有关 SockJS 传输类型和浏览器的完整列表,请参阅SockJS client页面。传输分为 3 大类:WebSocket,HTTP Streaming 和 HTTP Long Polling。有关这些类别的概述,请参阅这篇博文

SockJS client 首先发送"GET /info"以从服务器获取基本信息。之后,它必须决定使用什么传输。如果可能,使用 WebSocket。如果没有,在大多数浏览器中至少有一个 HTTP 流选项,如果没有,则使用 HTTP(long)轮询。

所有传输请求都具有以下 URL 结构:

http://host:port/myApp/myEndpoint/{server-id}/{session-id}/{transport}
  • {server-id} - 用于在 cluster 中路由请求但不使用其他方式。

  • {session-id} - 关联属于 SockJS session 的 HTTP 请求。

  • {transport} - 表示传输类型,e.g. “websocket”,“xhr-streaming”等

WebSocket 传输只需要一个 HTTP 请求即可进行 WebSocket 握手。之后的所有消息都在该 socket 上交换。

HTTP 传输需要更多请求。 Ajax/XHR streaming for example 依赖于 server-to-client 消息的 long-running 请求和 client-to-server 消息的其他 HTTP POST 请求。 Long 轮询类似,除了它_在每次 server-to-client 发送后结束当前请求。

SockJS 增加了最小的消息框架。对于 example,服务器最初发送字母 o(“打开”框架),消息作为[135](JSON-encoded array)发送,如果没有消息默认流动 25 秒,则发送字母 h(“心跳”帧),并且字母 c(“关闭”框架)关闭 session。

要了解更多信息,请在浏览器中运行 example 并查看 HTTP 请求。 SockJS client 允许修复传输列表,因此可以在 time 时查看每个传输。 SockJS client 还提供了一个 debug flag,它可以在浏览器 console 中启用有用的消息。在服务器端为org.springframework.web.socket启用TRACE logging。有关更多详细信息,请参阅 SockJS 协议叙述测试

26.3.2 启用 SockJS

通过 Java configuration 很容易启用 SockJS:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler").withSockJS();
    }

    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }

}

和 XML configuration 等价物:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:handlers>
        <websocket:mapping path="/myHandler" handler="myHandler"/>
        <websocket:sockjs/>
    </websocket:handlers>

    <bean id="myHandler" class="org.springframework.samples.MyHandler"/>

</beans>

以上内容适用于 Spring MVC applications,应该包含在DispatcherServlet的 configuration 中。但是,Spring 的 WebSocket 和 SockJS 支持并不依赖于 Spring MVC。在SockJsHttpRequestHandler的帮助下集成到其他 HTTP 服务环境相对简单。

在浏览器端,applications 可以使用模拟 W3C WebSocket API 的sockjs-client(version 1.0.x)并与服务器通信,以根据其运行的浏览器选择最佳传输选项。查看sockjs-client页面和支持的传输类型列表通过浏览器。 client 还为 example 提供了几个 configuration 选项,用于指定要包含的传输。

26.3.3 IE 8,9

Internet Explorer 8 和 9 对于某些 time 来说仍然是 common。他们是拥有 SockJS 的关键原因。本节介绍有关在这些浏览器中运行的重要注意事项。

SockJS client 通过微软的XDomainRequest支持 IE 8 和 9 中的 Ajax/XHR 流媒体。这适用于跨域但不支持发送 cookies。 Cookies 经常是 Java applications 必不可少的。但是,由于 SockJS client 可以与许多服务器类型(不仅仅是 Java 类型)一起使用,因此需要知道 cookies 是否重要。如果是这样,SockJS client 更喜欢 Ajax/XHR 用于流式传输,否则它依赖于 iframe-based 技术。

来自 SockJS client 的第一个"/info"请求是对可以影响客户端传输选择的信息的请求。其中一个细节是 server application 是否依赖于 cookies,e.g. 用于身份验证或使用粘性会话进行群集。 Spring 的 SockJS 支持包括一个名为sessionCookieNeeded的 property。它默认启用,因为大多数 Java applications 依赖于JSESSIONID cookie。如果您的 application 不需要它,您可以关闭此选项,SockJS client 应该在 IE 8 和 9 中选择xdr-streaming

如果您确实使用 iframe-based 传输,并且在任何情况下,通过将 HTTP 响应头X-Frame-Options设置为DENYSAMEORIGINALLOW-FROM <origin>,可以指示可以指示浏览器阻止在给定页面上使用 IFrame。这用于防止点击劫持

Spring Security 3.2 支持在每个响应上设置X-Frame-Options。默认情况下,Spring Security Java 配置将其设置为DENY。在 3.2 中,Spring Security XML 命名空间默认情况下不设置该标头,但可以配置为执行此操作,并且将来可以默认设置它。

有关如何配置X-Frame-Options标头设置的详细信息,请参阅 Spring Security 文档的第 7.1 节。 “默认安全 Headers”。您也可以查看或 watch SEC-2501以获取更多背景信息。

如果你的 application 添加了X-Frame-Options响应头(因为它应该!)并依赖于 iframe-based 传输,你需要将头 value 设置为SAMEORIGINALLOW-FROM <origin>。除此之外,Spring SockJS 支持还需要知道 SockJS client 的位置,因为它是从 iframe 加载的。默认情况下,iframe 设置为从 CDN 位置下载 SockJS client。将此选项配置到与 application 相同的来源是一个很好的 idea。

在 Java 配置中,这可以如下所示完成。 XML 命名空间通过<websocket:sockjs>元素提供类似的选项:

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS()
                .setClientLibraryUrl("http://localhost:8080/myapp/js/sockjs-client.js");
    }

    // ...

}

在初始开发期间,请启用 SockJS client devel模式,以防止浏览器缓存否则将被缓存的 SockJS 请求(如 iframe)。有关如何启用它的详细信息,请参阅SockJS client页面。

26.3.4 心跳

SockJS 协议要求服务器发送心跳消息以阻止代理断定连接挂起。 Spring SockJS configuration 有一个名为heartbeatTime的 property,可用于自定义频率。默认情况下,假设在该连接上没有发送其他消息,则在 25 秒后发送心跳。对于公共 Internet 应用,这 25 秒 value 在 line 中,以下IETF 推荐

当使用 STOMP 而不是 WebSocket/SockJS 时,如果 STOMP client 和服务器协商要交换的心跳,则禁用 SockJS 心跳。

Spring SockJS 支持还允许配置TaskScheduler以用于调度心跳任务。任务计划程序由线程池支持,默认设置基于可用处理器的数量。 Applications 应考虑根据自己的特定需求自定义设置。

26.3.5 Client 断开连接

HTTP 流和 HTTP long 轮询 SockJS 传输要求连接保持打开时间比平时长。有关这些技术的概述,请参阅这篇博文

在 Servlet 容器中,这是通过 Servlet 3 异步支持完成的,该支持允许退出 Servlet 容器线程处理请求并继续写入来自另一个线程的响应。

一个特定的问题是 Servlet API 不为已经消失的客户端提供通知,请参阅SERVLET_SPEC-44。但是,Servlet 容器会在后续尝试写入响应时引发 exception。由于 Spring 的 SockJS 服务支持 sever-sent 心跳(默认情况下每 25 秒),这意味着如果更频繁地发送消息,通常会在_time 期间或更早的时间内检测到 client 断开连接。

因此,网络 IO 故障可能仅仅因为 client 已断开连接而发生,这可能会在 log 中填充不必要的堆栈跟踪。 Spring 尽最大努力识别代表 client 断开连接(特定于每个服务器)的网络故障,并使用AbstractSockJsSession中定义的专用 log 类别DISCONNECTED_CLIENT_LOG_CATEGORY来记录最小消息。如果需要查看堆栈跟踪,请将 log 类别设置为 TRACE。

26.3.6 SockJS 和 CORS

如果允许 cross-origin 请求(请参阅第 26.2.6 节,“配置允许的原点”),则 SockJS 协议在 XHR 流和轮询传输中使用 CORS 进行 cross-domain 支持。因此,除非检测到响应中存在 CORS headers,否则会自动添加 CORS headers。因此,如果已经将 application 配置为提供 CORS 支持,e.g. 通过 Servlet 过滤器,Spring 的 SockJsService 将跳过这一部分。

也可以通过 Spring 的 SockJsService 中的suppressCors property 禁用这些 CORS _header 的添加。

以下是 SockJS 预期的 headers 和值列表:

  • "Access-Control-Allow-Origin" - 从“Origin”请求标头的 value 初始化。

  • "Access-Control-Allow-Credentials" - 始终设置为true

  • "Access-Control-Request-Headers" - 从等效请求标头中的值初始化。

  • "Access-Control-Allow-Methods" - 传输支持的 HTTP 方法(请参阅TransportType enum)。

  • "Access-Control-Max-Age" - 设置为 31536000(1 年)。

对于确切的 implementation,请参阅AbstractSockJsService中的addCorsHeaders以及 source code 中的TransportType enum。

或者,如果 CORS configuration 允许它考虑使用 SockJS 端点前缀排除 URL,从而让 Spring 的SockJsService处理它。

26.3.7 SockJsClient

order 中提供了 SockJS Java client,用于在不使用浏览器的情况下连接到 remote SockJS endpoints。当需要通过公共网络在 2 个服务器之间进行双向通信时,这尤其有用。网络代理可能会阻止使用 WebSocket 协议。 SockJS Java client 对于测试目的也非常有用,例如,可以模拟大量并发用户。

SockJS Java client 支持“websocket”,“xhr-streaming”和“xhr-polling”传输。其余的仅适用于浏览器。

WebSocketTransport可配置为:

  • 在 JSR-356 运行时StandardWebSocketClient

  • JettyWebSocketClient使用 Jetty 9 本机 WebSocket API

  • Spring 的所有 implementation WebSocketClient

根据定义,XhrTransport同时支持“xhr-streaming”和“xhr-polling”,因为从 client 角度来看,除了用于连接服务器的 URL 之外没有其他区别。目前有两个_Implempleations:

  • RestTemplateXhrTransport对 HTTP 请求使用 Spring 的RestTemplate

  • JettyXhrTransport使用 Jetty 的HttpClient进行 HTTP 请求。

下面的 example 显示了如何创建 SockJS client 并连接到 SockJS 端点:

List<Transport> transports = new ArrayList<>(2);
transports.add(new WebSocketTransport(new StandardWebSocketClient()));
transports.add(new RestTemplateXhrTransport());

SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.doHandshake(new MyWebSocketHandler(), "ws://example.com:8080/sockjs");

SockJS 使用 JSON 格式的数组进行消息传递。默认使用 Jackson 2 并且需要在 classpath 上。或者,您可以配置SockJsMessageCodec的自定义 implementation 并在SockJsClient上配置它。

要使用 SockJsClient 模拟大量并发用户,您需要配置基础 HTTP client(用于 XHR 传输)以允许足够数量的连接和线程。对于使用 Jetty 的 example:

HttpClient jettyHttpClient = new HttpClient();
jettyHttpClient.setMaxConnectionsPerDestination(1000);
jettyHttpClient.setExecutor(new QueuedThreadPool(1000));

还要考虑自定义这些 server-side 与 SockJS 相关的 properties(有关详细信息,请参阅 Javadoc):

@Configuration
public class WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/sockjs").withSockJS()
            .setStreamBytesLimit(512 * 1024)
            .setHttpMessageCacheSize(1000)
            .setDisconnectDelay(30 * 1000);
    }

    // ...
}

26.4 STOMP

WebSocket 协议定义了两种类型的消息,文本和二进制,但它们的内容是未定义的。定义了 client 和服务器协商 sub-protocol - i.e 的机制。一个更高的 level 消息传递协议,在 WebSocket 之上使用来定义每个消息可以发送什么类型的消息,每个消息的格式和内容是什么,等等。使用 sub-protocol 是可选的,但无论是 client 还是服务器,都需要就定义消息内容的某些协议达成一致。

26.4.1 概述

STOMP是一个简单的 text-oriented 消息传递协议,最初是为 Ruby,Python 和 Perl 等脚本语言创建的,用于连接企业消息代理。它旨在解决常用消息传递模式的子集。 STOMP 可用于任何可靠的 2-way 流媒体网络协议,如 TCP 和 WebSocket。虽然 STOMP 是 text-oriented 协议,但消息的有效负载可以是文本或二进制。

STOMP 是一种基于帧的协议,其帧在 HTTP 上建模。 STOMP 框架的结构:

COMMAND
header1:value1
header2:value2

Body^@

Clients 可以使用 SEND 或 SUBSCRIBE 命令发送或订阅消息以及描述消息内容和接收消息的“目标”标头。这启用了一个简单的 publish-subscribe 机制,可用于通过 broker 将消息发送到其他连接的客户端,或者向服务器发送消息以请求执行某些工作。

使用 Spring 的 STOMP 支持时,Spring WebSocket application 充当客户端的 STOMP broker。消息被路由到@Controller message-handling 方法或简单的 in-memory broker,它跟踪订阅并向订阅用户广播消息。您还可以将 Spring 配置为使用专用的 STOMP broker(e.g. RabbitMQ,ActiveMQ 等)来实现消息的实际 broadcasting。在这种情况下,Spring 维护与 broker 的 TCP 连接,向其中继消息,并将消息从它传递给连接的 WebSocket clients。因此 Spring web applications 可以依赖统一的 HTTP-based 安全性,common 验证和熟悉的编程 model message-handling 工作。

这是 client 订阅接收股票报价的示例,服务器可以定期发出 e.g. 通过计划任务通过SimpMessagingTemplate向 broker 发送消息:

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

这是一个发送交易请求的客户端的示例,服务器可以通过@MessageMapping方法处理,之后执行,broadcast 交易确认消息和详细信息到 client:

SEND
destination:/queue/trade
content-type:application/json
content-length:44

{"action":"BUY","ticker":"MMM","shares",44}^@

目的地的含义在 STOMP 规范中故意保持不透明。它可以是任何 string,并且完全由 STOMP 服务器来定义它们支持的目标的语义和语法。然而,非常常见的是,目标是 path-like strings,其中"/topic/.."暗示 publish-subscribe(one-to-many),"/queue/"暗示 point-to-point(one-to-one)消息交换。

STOMP 服务器可以使用 MESSAGE 命令向所有订户 broadcast 消息。这是服务器向订阅的 client 发送股票报价的示例:

MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM

{"ticker":"MMM","price":129.45}^@

知道服务器无法发送未经请求的消息非常重要。来自服务器的所有消息必须响应特定的 client 订阅,并且服务器消息的“subscription-id”标头必须 match client 订阅的“id”标头。

以上概述旨在提供对 STOMP 协议的最基本的了解。建议完整地审查协议规格

26.4.2 好处

使用 STOMP 作为 sub-protocol 使 Spring Framework 和 Spring Security 能够提供更丰富的编程模型与使用原始 WebSockets。关于 HTTP 与原始 TCP 的关系以及它如何使 Spring MVC 和其他 web 框架能够提供丰富的功能,可以做出同样的观点。以下是一系列好处:

  • 无需发明自定义消息传递协议和消息格式。

  • STOMP clients 可用,包括 Spring Framework 中的Java client

  • 可以使用诸如 RabbitMQ,ActiveMQ 等消息代理(可选)来管理订阅和 broadcast 消息。

  • Application 逻辑可以组织成任意数量的@Controller和基于 STOMP 目标标头路由到它们的消息与处理给定连接的单个WebSocketHandler的原始 WebSocket 消息。

  • 使用 Spring Security 来保护基于 STOMP 目标和消息类型的消息。

26.4.3 启用 STOMP

spring-messagingspring-websocket模块中提供了对 WebSocket 支持的 STOMP。一旦拥有了这些依赖项,就可以通过 WebSocket 使用第 26.3 节,“SockJS 后备”公开 STOMP endpoints,如下所示:

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");
    }
}

"/portfolio"是 WebSocket(或 SockJS)客户端为了进行 WebSocket 握手而需要连接的端点的 HTTP URL。

目标头以"/app"开头的 STOMP 消息将路由到@Controller classes 中的@MessageMapping方法。

使用 built-in,消息 broker 进行订阅和 broadcasting; 将目的地标题以“/topic”或“/queue”开头的邮件转到 broker。
XML 中的相同 configuration:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio">
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:simple-broker prefix="/topic, /queue"/>
    </websocket:message-broker>

</beans>

对于 built-in,简单的 broker,“/topic”和“/queue”前缀没有任何特殊含义。它们只是区分 pub-sub 与 point-to-point 消息传递的惯例(i.e.许多订阅者与一个消费者)。使用外部 broker 时,请检查 broker 的 STOMP 页面,以了解它支持的 STOMP 目标和前缀类型。

要从浏览器连接,对于 SockJS,您可以使用sockjs-client。对于 STOMP,许多应用程序使用了jmesnil/stomp-websocket library(也称为 stomp.js),这是 feature 完整的并且已在 production 中使用多年但不再维护。目前JSteunou/webstomp-client是该图书馆中最积极维护和不断发展的继承者,下面的 example code 基于它:

var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);

stompClient.connect({}, function(frame) {
}

或者如果通过 WebSocket 连接(没有 SockJS):

var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);

stompClient.connect({}, function(frame) {
}

请注意,上面的stompClient不需要指定loginpasscode headers。即使它确实如此,它们也会在服务器端被忽略或被覆盖。有关身份验证的详细信息,请参阅第 26.4.9 节,“连接到 Broker”第 26.4.11 节,“身份验证”部分。

有关更多 example code 的信息,请参阅:

26.4.4 消息流

一旦暴露了 STOMP 端点,Spring application 就成为连接的 clients 的 STOMP broker。本节介绍服务器端的消息流。

spring-messaging模块包含对源自Spring Integration的消息传递应用程序的基础支持,后来被提取并合并到 Spring Framework 中,以便在许多Spring 项目和 application 场景中得到更广泛的使用。下面列出了一些可用的消息传递抽象:

Java 配置(i.e .@EnableWebSocketMessageBroker)和 XML 命名空间配置(i.e.<websocket:message-broker>)都使用上述组件来组合消息工作流。下图显示了启用简单的 built-in 消息 broker 时使用的组件:

消息流简单 broker

上图中有 3 个消息 channels:

  • "clientInboundChannel" - 用于传递从 WebSocket clients 收到的消息。

  • "clientOutboundChannel" - 用于将服务器消息发送到 WebSocket clients。

  • "brokerChannel" - 用于从 server-side, applicationcode 中发送消息 broker。

下图显示了配置外部 broker(e.g. RabbitMQ)以管理订阅和 broadcasting 消息时使用的组件:

消息流 broker relay

上图中的主要区别是使用“broker relay”通过 TCP 将消息传递到外部 STOMP broker,以及将消息从 broker 传递到订阅的 clients。

当从 WebSocket connectin 接收消息时,它们被解码为 STOMP 帧,然后变成 Spring Message表示,并发送到"clientInboundChannel"进行进一步处理。对于 example STOMP 消息,其目标头以"/app"开头,可以路由到带注释的控制器中的@MessageMapping方法,而"/topic""/queue"消息可以直接路由到消息 broker。

处理来自 client 的 STOMP 消息的带注释的@Controller可以通过"brokerChannel"向消息 broker 发送消息,broker 将通过"clientOutboundChannel"将消息_b 广播到匹配的订阅者。同一个控制器也可以响应 HTTP 请求执行相同的操作,因此 client 可以执行 HTTP POST,然后@PostMapping方法可以向消息 broker 发送消息以 broadcast 发送给订阅的 clients。

让我们通过一个简单的 example 跟踪流程。鉴于以下服务器设置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app");
        registry.enableSimpleBroker("/topic");
    }

}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }

}
  • Client 连接到"http://localhost:8080/portfolio",一旦建立了 WebSocket 连接,STOMP 帧就开始在其上流动。

  • Client 发送带有目标头"/topic/greeting"的 SUBSCRIBE 帧。收到并解码后,消息将发送到"clientInboundChannel",然后路由到_stker logient 订阅的消息 broker。

  • Client 将 SEND 帧发送到"/app/greeting""/app"前缀有助于将其路由到带注释的控制器。删除"/app"前缀后,目标的剩余"/greeting"部分将映射到GreetingController中的@MessageMapping方法。

  • GreetingController返回的 value 变为 Spring Message,其有效负载基于 return value 和"/topic/greeting"的默认目标头(从输入目标派生,"/app"替换为"/topic")。生成的消息将发送到“brokerChannel”并由消息 broker 处理。

  • 消息 broker 找到所有匹配的订阅者,并通过"clientOutboundChannel"向每个订阅者发送一个 MESSAGE 帧,消息被编码为 STOMP 帧并在 WebSocket 连接上发送。

下一节提供了有关注释方法的更多详细信息,包括支持的 arguments 和 return 值的种类。

26.4.5 带注释的控制器

Applications 可以使用带注释的@Controller classes 来处理来自 clients 的消息。这样的 classes 可以声明@MessageMapping@SubscribeMapping@ExceptionHandler方法,如下所述。

@MessageMapping

@MessageMapping annotation 可用于根据目的地 route 消息的方法。方法 level 和 level 类型都支持它。在 level 类型@MessageMapping用于表示控制器中所有方法的共享映射。

默认情况下,目标映射应为 Ant-style,路径模式,e.g. “/foo *”,“/foo/ **”。模式包括对模板变量的支持,e.g. “/foo/ ”,可以使用@DestinationVariable方法 arguments 引用。

Applications 可以选择切换到 dot-separated 目标约定。见第 26.4.10 节,“点作为分隔符”

@MessageMapping方法可以使用以下 arguments 具有灵活的签名:

方法论证描述
Message用于访问完整的消息。
MessageHeaders用于访问Message中的 headers。
MessageHeaderAccessor , SimpMessageHeaderAccessor , StompHeaderAccessor用于通过类型化访问器方法访问 headers。
@Payload要访问消息的有效负载,请通过已配置的MessageConverter进行转换(e.g. 来自 JSON)。
不需要存在此 annotation,因为如果没有其他参数匹配,则默认情况下会假定它。
Payload arguments 可以在 order 中使用@javax.validation.Valid或 Spring @Validated进行注释,以便自动验证。
@Header如有必要,使用org.springframework.core.convert.converter.Converter访问特定标头 value 以及类型转换。
@Headers用于访问消息中的所有 headers。此参数必须可分配给java.util.Map
@DestinationVariable用于访问从消息目标中提取的模板变量。根据需要,值将转换为声明的方法参数类型。
java.security.Principal反映在 WebSocket HTTP 握手的 time 登录的用户。

@MessageMapping方法返回 value 时,默认情况下 value 通过已配置的MessageConverter序列化为有效负载,然后作为Message发送到"brokerChannel",从_b播送到订阅者。出站消息的目的地与入站消息的目的地相同,但前缀为"/topic"

您可以使用@SendTo方法 annotation 自定义要将有效内容发送到的目标。也可以在 class level 中使用@SendTo来共享发送消息的默认目标目标。 @SendToUser是仅向与消息关联的用户发送消息的变体。有关详细信息,请参阅第 26.4.13 节,“用户目的地”

来自@MessageMapping方法的 return value 可以在 order 中包含ListenableFutureCompletableFutureCompletionStage,以异步生成有效负载。

作为从@MessageMapping方法返回有效负载的替代方法,您还可以使用SimpMessagingTemplate发送消息,这也是如何在封面下处理 return 值。见第 26.4.6 节,“发送消息”

@SubscribeMapping

@SubscribeMapping annotation 与@MessageMapping in order 结合使用,以缩小到订阅消息的映射。在这种情况下,@MessageMapping annotation 指定目标,而@SubscribeMapping表示仅对订阅消息感兴趣。

对于映射和输入 arguments,@SubscribeMapping方法通常与任何@MessageMapping方法没有区别。对于 example,您可以将它与 type-level @MessageMapping组合以表示共享目标前缀,并且您可以使用与任何@MessageMapping`方法相同的方法 arguments

@SubscribeMapping的 key 区别在于方法的 return value 被序列化为有效负载而不是发送到“brokerChannel”而是发送到“clientOutboundChannel”,直接有效地回复客户端而不是通过 broker 进行 broadcasting。这对于实现 one-off,request-reply 消息交换非常有用,并且永远不会保留订阅。当必须加载和显示数据时,此 pattern 的 common 场景是 application 初始化。

@SubscribeMapping方法也可以使用@SendTo进行注释,在这种情况下,return value 将使用显式指定的目标目标发送到"brokerChannel"

@MessageExceptionHandler

application 可以使用@MessageExceptionHandler方法来处理@MessageMapping方法的 exceptions。 感兴趣的异常可以在 annotation 本身中声明,或者如果你想要访问 exception 实例,可以通过方法参数声明:

@Controller
public class MyController {

    // ...

    @MessageExceptionHandler
    public ApplicationError handleException(MyException exception) {
        // ...
        return appError;
    }
}

@MessageExceptionHandler方法支持灵活的方法签名,并支持与@MessageMapping方法相同的方法参数类型和 return 值。

通常,@MessageExceptionHandler方法适用于声明它们的@Controller class(或 class 层次结构)。如果您希望这些方法在控制器之间全局应用更多,则可以在标有@ControllerAdvice的 class 中声明它们。这与 Spring MVC 中的类似的支持相当。

26.4.6 发送消息

如果要从 application 的任何部分向连接的 clients 发送消息,该怎么办?任何 application component 都可以向"brokerChannel"发送消息。最简单的方法是注入SimpMessagingTemplate,并使用它来发送消息。通常,它应该很容易按类型注入,例如:

@Controller
public class GreetingController {

    private SimpMessagingTemplate template;

    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
        this.template = template;
    }

    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }

}

但是如果存在另一个相同类型的 bean,它也可以通过其 name“brokerMessagingTemplate”进行限定。

26.4.7 简单 Broker

built-in,简单消息 broker 处理来自 clients 的订阅请求,将它们存储在 memory 中,并将消息广播到具有匹配目标的已连接客户端。 broker 支持 path-like 目的地,包括对 Ant-style 目的地模式的订阅。

Applications 也可以使用 dot-separated 目的地(vs 斜杠)。见第 26.4.10 节,“点作为分隔符”

26.4.8 外部 Broker

简单的 broker 非常适合入门,但仅支持 STOMP 命令的子集(e.g. 没有 acks,收据,etc.),依赖于简单的消息发送循环,并且不适合群集。作为替代方案,applications 可以升级到使用 full-featured 消息 broker。

检查 STOMP 文档以获取您选择的消息 broker(e.g. RabbitMQActiveMQ,etc.),安装 broker,并在启用 STOMP 支持的情况下运行它。然后在 Spring configuration 中启用 STOMP broker 中继而不是简单的 broker。

下面是启用 full-featured broker 的 example configuration:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

XML configuration 等效:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app">
        <websocket:stomp-endpoint path="/portfolio" />
            <websocket:sockjs/>
        </websocket:stomp-endpoint>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

</beans>

上述 configuration 中的“STOMP broker relay”是一个 Spring MessageHandler,它通过将消息转发给外部消息 broker 来处理消息。为此,它建立到 broker 的 TCP 连接,将所有消息转发给它,然后通过其 WebSocket 会话将从 broker 接收的所有消息转发给 clients。从本质上讲,它充当“转发”,可以在两个方向上转发消息。

Spring 使用org.projectreactor:reactor-netio.netty:netty-all来管理与 broker 的 TCP 连接,这两者都需要作为项目依赖项添加。

Spring Framework 4.3.x 中的 STOMP broker 支持与 2.0.x 代 Reactor 兼容。因此,不需要与需要 Reactor 3.x 的spring-cloud-stream-reactive模块结合使用。

Spring Framework 5 依赖于 Reactor 3 和 Reactor Netty,它具有独立的版本控制,用于与 STOMP broker 的 TCP 连接,但也为 reactive 编程模型提供广泛的支持。

此外,application 组件(e.g. HTTP 请求处理方法,业务服务,etc.)也可以向 broker 中继发送消息,如第 26.4.6 节,“发送消息”中所述,在 broadcast 消息中订阅 WebSocket clients。

实际上,broker 中继实现了健壮且可扩展的消息广播。

26.4.9 连接到 Broker

STOMP broker 中继维护与 broker 的单个“系统”TCP 连接。此连接仅用于源自 server-side application 的消息,而不用于接收消息。您可以为此连接配置 STOMP 凭据 i.e。 STOMP 框架loginpasscode headers。这在 XML 命名空间和 Java 配置中都显示为systemLogin/systemPasscode properties,默认值为guest/guest

STOMP broker 中继还为每个连接的 WebSocket client 创建单独的 TCP 连接。您可以配置 STOMP 凭据以用于代表 clients 创建的所有 TCP 连接。这在 XML 命名空间和 Java 配置中都显示为clientLogin/clientPasscode properties,默认值为guest/guest

STOMP broker 中继始终在每个CONNECT帧上设置loginpasscode headers,它们代表 clients 转发给 broker。因此 WebSocket clients 不需要设置那些 headers;他们会被忽略。正如第 26.4.11 节,“身份验证”解释的那样,WebSocket clients 应该依赖 HTTP 身份验证来保护 WebSocket 端点并建立 client 身份。

STOMP broker 中继还通过“系统”TCP 连接向消息 broker 发送和接收心跳。您可以配置发送和接收心跳的间隔(默认情况下每个 10 秒)。如果与 broker 的连接丢失,broker 中继将继续尝试每 5 秒重新连接一次,直到成功为止。

任何 Spring bean 都可以在 order 中实现ApplicationListener<BrokerAvailabilityEvent>,以便在与 broker 的“系统”连接丢失和 re-established 时接收通知。对于示例股票报价服务 broadcasting 股票报价可以在没有 active“系统”连接时停止尝试发送消息。

默认情况下,STOMP broker 中继始终连接,并在连接丢失时根据需要重新连接到同一个 host 和 port。如果您希望提供多个地址,则在每次尝试连接时,您都可以配置地址供应商,而不是固定的 host 和 port。例如:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

	// ...

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
		registry.setApplicationDestinationPrefixes("/app");
	}

	private Reactor2TcpClient<byte[]> createTcpClient() {

		Supplier<InetSocketAddress> addressSupplier = new Supplier<InetSocketAddress>() {
			@Override
			public InetSocketAddress get() {
				// Select address to connect to ...
			}
		};

		StompDecoder decoder = new StompDecoder();
		Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
		return new Reactor2TcpClient<>(addressSupplier, codec);
	}

}

STOMP broker 中继也可以配置virtualHost property。此 property 的 value 将被设置为每个CONNECT帧的host标头,并且对于云环境中的 example 非常有用,其中建立 TCP 连接的实际 host 与提供 cloud-basedSTOMP 服务的 host 不同。

26.4.10 点作为分隔符

当消息路由到@MessageMapping方法时,它们与AntPathMatcher匹配,默认情况下,模式应使用斜杠“/”作为分隔符。这是 web applications 中的一个很好的约定,类似于 HTTP URL。但是,如果您更习惯于消息传递约定,则可以切换到使用点“。”作为分隔符。

在 Java 配置中:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    // ...

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setPathMatcher(new AntPathMatcher("."));
        registry.enableStompBrokerRelay("/queue", "/topic");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

在 XML 中:

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:websocket="http://www.springframework.org/schema/websocket"
        xsi:schemaLocation="
                http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
                http://www.springframework.org/schema/websocket
                http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
        <websocket:stomp-endpoint path="/stomp"/>
        <websocket:stomp-broker-relay prefix="/topic,/queue" />
    </websocket:message-broker>

    
    <bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
        <constructor-arg index="0" value="."/>
    </bean>
    

</beans>

之后,控制器可以使用点“。”作为@MessageMapping方法中的分隔符:

@Controller
@MessageMapping("foo")
public class FooController {

    @MessageMapping("bar.{baz}")
    public void handleBaz(@DestinationVariable String baz) {
        // ...
    }
}

client 现在可以向"/app/foo.bar.baz123"发送消息。

在上面的 example 中,我们没有更改“broker relay”上的前缀,因为它们完全依赖于外部消息 broker。检查您正在使用的 broker 的 STOMP 文档页面,以查看它为目标标头支持的约定。

另一方面,“简单的 broker”确实依赖于配置的PathMatcher,所以如果你切换也适用于 broker 的分隔符,并且方式将消息中的目的地与订阅中的模式匹配。

26.4.11 身份验证

每个 STOMP over WebSocket messaging session 都以 HTTP 请求开始 - 可以是升级到 WebSockets 的请求(i.e.一个 WebSocket 握手),或者在 SockJS 回退一系列 SockJS HTTP 传输请求的情况下。

Web applications 已经具有用于保护 HTTP 请求的身份验证和授权。通常,用户通过 Spring Security 使用某种机制(如登录页面,HTTP 基本身份验证或其他)进行身份验证。经过身份验证的用户的 security context 保存在 HTTP session 中,并与同一 cookie-based session 中的后续请求相关联。

因此,对于 WebSocket 握手或 SockJS HTTP 传输请求,通常会有通过HttpServletRequest#getUserPrincipal()可访问的经过身份验证的用户。 Spring 会自动将该用户与为其创建的 WebSocket 或 SockJS session 相关联,然后通过用户标头将该用户与通过该 session 传输的所有 STOMP 消息相关联。

简而言之,典型的 web application 除了它已经为安全做的事情之外,还没有什么特别的。用户在 HTTP 请求 level 上进行身份验证,并通过 cookie-basedHTTPsession 维护安全 context,然后将其与为该用户创建的 WebSocket 或 SockJS 会话相关联,并在每个Message上流过 application 标记用户标头。

请注意,STOMP 协议在CONNECT帧上确实有“登录”和“密码”headers。这些最初是针对 STOMP over TCP 的示例而设计的,并且仍然需要。但是,对于 STOMP over WebSocket,默认情况下 Spring 会忽略 STOMP 协议 level 上的授权 headers,并假定用户已经在 HTTP 传输 level 上进行了身份验证,并期望 WebSocket 或 SockJS session 包含经过身份验证的用户。

Spring Security 提供了WebSocket sub-protocol 授权,它使用ChannelInterceptor根据其中的用户头来授权消息。此外,Spring Session 提供了一个WebSocket integration,确保当 WebSocket session 仍处于 active 时,用户 HTTP session 不会过期。

26.4.12 令牌认证

Spring Security OAuth提供对基于令牌的安全性的支持,包括 JSON Web Token(JWT)。这可以用作 Web applications 中的身份验证机制,包括 STOMP over WebSocket 交互,就像上一节 i.e 中所述。通过 cookie-basedsession 保持身份。

在同一 time__11会话并不总是最适合_appample 中的 example,它们根本不希望保持 server-side session 或者在移动应用程序中,其中 common 使用 headers 进行身份验证。

WebSocket 协议 RFC 6455“没有规定服务器在 WebSocket 握手期间可以对 clients 进行身份验证的任何特定方式。”实际上,浏览器客户端只能使用标准身份验证 headers(i.e.基本 HTTP 身份验证)或 cookies,而不能用 example 提供自定义 headers。同样,SockJS JavaScript client 没有提供使用 SockJS 传输请求发送 HTTP headers 的方法,请参阅sockjs-client issue 196。相反,它确实允许发送可用于发送令牌但具有其自身缺点的查询参数,例如,因为令牌可能无意中使用服务器日志中的 URL 进行了记录。

上述限制适用于 browser-based clients,不适用于 Spring Java-based STOMP client,它支持使用 WebSocket 和 SockJS 请求发送 headers。

因此,希望避免使用 cookies 的应用程序在 HTTP 协议 level 上可能没有任何良好的身份验证选择。他们可能更喜欢在 STOMP 消息传递协议中使用 headers 进行身份验证,而不是使用 cookies。有两个简单的步骤:

  • 使用 STOMP client 在 connect time 传递身份验证 header(s)。

  • 使用ChannelInterceptor处理身份验证 header(s)。

下面是注册自定义身份验证拦截器的 example server-side configuration。请注意,拦截器只需要在 CONNECT Message上进行身份验证并设置用户头。 Spring 将记录并保存经过身份验证的用户,并将其与同一 session 上的后续 STOMP 消息相关联:

@Configuration
@EnableWebSocketMessageBroker
public class MyConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.setInterceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                if (StompCommand.CONNECT.equals(accessor.getCommand())) {
                    Authentication user = ... ; // access authentication header(s)
                    accessor.setUser(user);
                }
                return message;
            }
        });
    }
}

另请注意,当使用 Spring Security 的消息授权时,目前您需要确保在 Spring Security 之前排序认证ChannelInterceptor config。最好通过在自己的 sub-class AbstractWebSocketMessageBrokerConfigurer中标记自定义拦截器来标记@Order(Ordered.HIGHEST_PRECEDENCE + 99)

26.4.13 用户目的地

应用程序可以发送针对特定用户的邮件,Spring 的 STOMP 支持可识别以"/user/"为前缀的目标。对于 example,client 可能订阅目标"/user/queue/position-updates"。此目标将由UserDestinationMessageHandler处理并转换为用户 session 独有的目标 e.g. "/queue/position-updates-user123"。这提供了在同一时间订阅一般命名的目的地的便利,确保不与订阅相同目的地的其他用户发生冲突,使得每个用户可以接收唯一的库存位置更新。

在发送方,消息可以被发送到诸如"/user/{username}/queue/position-updates"之类的目的地,而"/user/{username}/queue/position-updates"又将由UserDestinationMessageHandler转换成一个或多个目的地,每个目的地对应于与用户相关联的一个目的地。这允许 application 中的任何 component 发送针对特定用户的消息,而不必知道除 name 和通用目标之外的任何内容。通过 annotation 和消息传递模板也支持此功能。

对于 example,message-handling 方法可以向与通过@SendToUser annotation 处理的消息关联的用户发送消息(class-level 也支持共享 common 目标):

@Controller
public class PortfolioController {

    @MessageMapping("/trade")
    @SendToUser("/queue/position-updates")
    public TradeResult executeTrade(Trade trade, Principal principal) {
        // ...
        return tradeResult;
    }
}

如果用户具有多个 session,则默认情况下,所有订阅给定目标的会话都是目标。但有时,可能只需要定位发送正在处理的消息的 session。对于 example,可以通过将broadcast属性设置为 false 来完成此操作:

@Controller
public class MyController {

    @MessageMapping("/action")
    public void handleAction() throws Exception{
        // raise MyBusinessException here
    }

    @MessageExceptionHandler
    @SendToUser(destinations="/queue/errors", broadcast=false)
    public ApplicationError handleException(MyBusinessException exception) {
        // ...
        return appError;
    }
}

虽然用户目的地通常意味着经过身份验证的用户,但并不严格要求。与经过身份验证的用户无关的 WebSocket session 可以订阅用户目标。在这种情况下,@SendToUser annotation 的行为与broadcast=false,i.e 完全相同。仅定位发送正在处理的邮件的 session。

也可以通过注入由 Java 配置或 XML 命名空间创建的SimpMessagingTemplate来从任何 application component 向用户目的地发送消息,用于 example(bean name 是"brokerMessagingTemplate",如果需要用@Qualifier进行限定):

@Service
public class TradeServiceImpl implements TradeService {

	private final SimpMessagingTemplate messagingTemplate;

	@Autowired
	public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
		this.messagingTemplate = messagingTemplate;
	}

	// ...

	public void afterTradeExecuted(Trade trade) {
		this.messagingTemplate.convertAndSendToUser(
				trade.getUserName(), "/queue/position-updates", trade.getResult());
	}
}

将用户目标与外部消息 broker 一起使用时,请查看 broker 文档,了解如何管理非活动队列,以便在用户 session 结束时删除所有唯一用户队列。例如,当使用像/exchange/amq.direct/position-updates这样的目的地时,RabbitMQ 会创建 auto-delete 队列。所以在这种情况下 client 可以订阅/user/exchange/amq.direct/position-updates。同样,ActiveMQ 有configuration 选项用于清除非活动目标。

在 multi-application 服务器方案中,用户目标可能仍未解析,因为用户已连接到其他服务器。在这种情况下,您可以将目标配置为 broadcast 未解析的消息,以便其他服务器有机会尝试。这可以通过 Java 配置中MessageBrokerRegistryuserDestinationBroadcast property 和 XML 中message-broker元素的user-destination-broadcast属性来完成。

26.4.14 事件和拦截

发布了几个ApplicationContext events(下面列出),可以通过实现 Spring 的ApplicationListener接口来接收。

  • BrokerAvailabilityEvent - 表示 broker 何时变为 available/unavailable。虽然“简单”broker 在启动时立即可用,并且在 application 运行时仍然如此,但 STOMP“broker relay”可能会失去与全功能 broker 的连接,例如,如果 broker 重新启动,则为 example。 broker 中继具有重新连接逻辑,并且当它返回时将与 broker 的“系统”连接,因此每当 state 从连接变为断开连接时都会发布 event,反之亦然。使用SimpMessagingTemplate的组件应订阅此 event,并避免在 broker 不可用时发送消息。在任何情况下,他们都应该准备好在发送消息时处理MessageDeliveryException

  • SessionConnectEvent - 在收到新的 STOMP CONNECT 时发布,表明新 client session 的开始。 event 包含表示连接的消息,包括 session id,用户信息(如果有)以及 client 可能发送的任何自定义 headers。这对于跟踪 client 会话非常有用。订阅此 event 的组件可以使用SimpMessageHeaderAccessorStompMessageHeaderAccessor包装所包含的消息。

  • SessionConnectedEvent - 后不久发布,当 broker 发送 STOMP CONNECTED 帧以响应 CONNECT 时。此时,可以认为 STOMP session 已完全建立。

  • SessionSubscribeEvent - 在收到新的 STOMP SUBSCRIBE 时发布。

  • SessionUnsubscribeEvent - 在收到新的 STOMP UNSUBSCRIBE 时发布。

  • SessionDisconnectEvent - 在 STOMP session ends 时发布。 DISCONNECT 可能已从 client 发送,或者也可能在 WebSocket session 关闭时自动生成。在某些情况下,每个 session 可能会多次发布此 event。对于多个 disconnect events,组件应该是幂等的。

当使用 full-featured broker 时,STOMP“broker relay”会自动重新连接“system”连接,以防 broker 暂时不可用。但是,客户端连接不会自动重新连接。假设启用了心跳,客户端通常会注意到 broker 在 10 秒内没有响应。 Clients 需要实现自己的重新连接逻辑。

上述 events 反映了 STOMP 连接生命周期中的点。它们并不意味着为 client 发送的每条消息提供通知。相反,application 可以注册ChannelInterceptor来拦截每个传入和传出的 STOMP 消息。对于 example 来拦截入站消息:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.setInterceptors(new MyChannelInterceptor());
    }
}

自定义ChannelInterceptor可以扩展空方法 base class ChannelInterceptorAdapter并使用StompHeaderAccessorSimpMessageHeaderAccessor来访问有关该消息的信息。

public class MyChannelInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        StompCommand command = accessor.getStompCommand();
        // ...
        return message;
    }
}

请注意,就像上面的SesionDisconnectEvent一样,可能已经从 client 发送了 DISCONNECT 消息,或者也可能在 WebSocket session 关闭时自动生成。在某些情况下,拦截器可以每次 session 多次拦截此消息。对于多个 disconnect events,组件应该是幂等的。

26.4.15 STOMP Client

Spring 通过 WebSocket client 提供 STOMP,通过 TCP client 提供 STOMP。

要开始创建和配置WebSocketStompClient

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上面的 example 中,StandardWebSocketClient可以替换为SockJsClient,因为它也是WebSocketClient的 implementation。 SockJsClient可以使用 WebSocket 或 HTTP-based transport 作为后备。有关详细信息,请参阅第 26.3.7 节,“SockJsClient”

接下来建立连接并为 STOMP session 提供处理程序:

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当 session 准备好使用时,处理程序会收到通知:

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

一旦建立了 session,就可以发送任何有效负载,并使用配置的MessageConverter进行序列化:

session.send("/topic/foo", "payload");

您也可以订阅目的地。 subscribe方法需要处理订阅消息的处理程序,并 return 一个Subscription句柄,可用于取消订阅。对于每个收到的消息,处理程序可以指定目标 Object 类型,有效负载应该反序列化为:

session.subscribe("/topic/foo", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,请使用TaskScheduler配置WebSocketStompClient,并可选择自定义心跳间隔,写入不活动 10 秒,导致心跳发送,读取不活动 10 秒,关闭连接。

当使用WebSocketStompClient进行 performance 测试来模拟来自同一台机器的数千个 clients 时,请考虑关闭心跳,因为每个连接都会调度自己的心跳任务,并且没有针对同一台机器上的大量客户端进行优化。

STOMP 协议还支持收据,其中 client 必须添加“收据”标头,服务器在处理发送或订阅后用 RECEIPT 帧响应。为了支持这一点,StompSession提供setAutoReceipt(boolean),导致在每个后续发送或订阅时添加“收据”标题。或者,您也可以手动将“收据”标题添加到StompHeaders。发送和订阅 return 都可以用于注册接收成功和失败回调的Receiptable实例。对于此 feature,client 必须配置TaskScheduler并且收据到期前 time 的数量(默认为 15 秒)。

注意StompSessionHandler本身是一个StompFrameHandler,它允许它处理 ERROR 帧以及处理消息的 exceptions 的handleException回调,以及handleTransportError表示包含ConnectionLostException的 transport-level 错误。

26.4.16 WebSocket 范围

每个 WebSocket session 都有一个属性 map。 map 作为标题附加到入站 client 消息,可以从控制器方法访问,例如:

@Controller
public class MyController {

	@MessageMapping("/action")
	public void handle(SimpMessageHeaderAccessor headerAccessor) {
		Map<String, Object> attrs = headerAccessor.getSessionAttributes();
		// ...
	}
}

也可以在websocket范围内声明 Spring-managed bean。可以将 WebSocket-scoped beans 注入控制器和“clientInboundChannel”上注册的任何 channel 拦截器。这些通常是单身,比任何单独的 WebSocket session 都更长寿。因此,您需要为 WebSocket-scoped beans 使用范围代理模式:

@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {

    @PostConstruct
    public void init() {
        // Invoked after dependencies injected
    }

    // ...

    @PreDestroy
    public void destroy() {
        // Invoked when the WebSocket session ends
    }
}

@Controller
public class MyController {

    private final MyBean myBean;

    @Autowired
    public MyController(MyBean myBean) {
        this.myBean = myBean;
    }

    @MessageMapping("/action")
    public void handle() {
        // this.myBean from the current WebSocket session
    }
}

与任何自定义作用域一样,Spring 在从控制器访问第一个 time 时初始化一个新的MyBean实例,并在 WebSocket session 属性中存储实例。随后返回相同的实例,直到 session ends。 WebSocket-scoped beans 将调用所有 Spring 生命周期方法,如上面的示例所示。

26.4.17 表现

在绩效方面没有灵丹妙药。许多因素可能会影响它,包括消息的大小,数量,application 方法是否执行需要阻塞的工作,以及外部因素,如网络速度等。本节的目标是提供可用 configuration 选项的概述以及有关如何推理扩展的一些想法。

在消息传递 application 中,消息通过 channels 传递,用于由线程池支持的异步执行。配置这样的 application 需要充分了解 channels 和消息流。因此,建议审核第 26.4.4 节,“消息流”

显而易见的起点是配置支持"clientInboundChannel""clientOutboundChannel"的线程池。默认情况下,两者都配置为可用处理器数量的两倍。

如果注释方法中的消息处理主要是 CPU 绑定的,那么"clientInboundChannel"的线程数应该保持接近处理器的数量。如果他们所做的工作更多是 IO 绑定并且需要阻塞或等待数据库或其他外部系统,则需要增加线程池大小。

ThreadPoolExecutor有 3 个重要的 properties。这些是核心和最大线程池大小以及队列到 store 任务没有可用线程的任务的容量。

一个常见的混淆点是配置核心池大小(e.g.10)和最大池大小(e.g. 20)会导致线程池有 10 到 20 个线程。实际上,如果容量保留在 Integer.MAXVALUE 的默认值_,那么线程池将永远不会超出核心池大小,因为所有其他任务都将排队。

请查看ThreadPoolExecutor的 Javadoc,了解这些 properties 如何工作并理解各种排队策略。

"clientOutboundChannel"方面,它是关于向 WebSocket clients 发送消息的全部内容。如果 clients 位于快速网络上,则线程数应保持接近可用处理器的数量。如果它们很慢或带宽较低,则消耗消息所需的时间会更长,并给线程池带来负担。因此,增加线程池大小是必要的。

虽然“clientInboundChannel”的工作负载可以预测 - 毕竟它基于 application 的作用 - 如何配置“clientOutboundChannel”更难,因为它基于 application 控制之外的因素。因此,有两个与发送消息相关的其他 properties。那些是"sendTimeLimit""sendBufferSizeLimit"。这些用于配置如何允许发送 long 以及在向 client 发送消息时可以缓冲多少数据。

一般 idea 是在任何给定的 time 时,只有一个线程可用于发送给 client。所有其他消息同时得到缓冲,您可以使用这些 properties 来决定如何允许发送消息 long 以及平均 time 可以缓冲多少数据。有关重要的其他详细信息,请查看此 configuration 的 Javadoc 和 XML schema 文档。

这是 example configuration:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport send-timeout="15000" send-buffer-size="524288" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

上面显示的 WebSocket 传输 configuration 还可用于配置传入 STOMP 消息的最大允许大小。虽然理论上 WebSocket 消息的大小几乎是无限的,但实际上 WebSocket 服务器强加了限制 - 例如,Tomcat 上的 8K 和 Jetty 上的 64K。因此,诸如 stomp.js 的 STOMP clients 在 16K 边界处拆分较大的 STOMP 消息,并将它们作为多个 WebSocket 消息发送,因此需要服务器缓冲和 re-assemble。

Spring 的 STOMP over WebSocket 支持这样做,因此 applications 可以配置 STOMP 消息的最大大小,而不管 WebSocket 服务器特定的消息大小。请记住,必要时将自动调整 WebSocket 消息大小,以确保它们至少可以携带 16K WebSocket 消息。

这是 example configuration:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        registration.setMessageSizeLimit(128 * 1024);
    }

    // ...

}
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/websocket
        http://www.springframework.org/schema/websocket/spring-websocket.xsd">

    <websocket:message-broker>
        <websocket:transport message-size="131072" />
        <!-- ... -->
    </websocket:message-broker>

</beans>

关于扩展的一个重点是使用多个 application 实例。目前,使用简单的 broker 无法做到这一点。但是当使用诸如 RabbitMQ 之类的 full-featured broker 时,每个 application 实例都连接到 broker,而来自一个 application 实例的消息 broadcast 可以通过 broker broadcast 到通过任何其他 application 实例连接的 WebSocket clients。

26.4.18 监测

使用@EnableWebSocketMessageBroker<websocket:message-broker> key 基础架构组件时,会自动收集统计信息和计数器,以便深入了解 application 的内部 state。 configuration 还声明了一个类型的 bean,它在一个地方收集所有可用信息,默认情况下每 30 分钟将其记录在INFO level 一次。这个 bean 可以通过 Spring 的MBeanExporter导出到 JMX,以便在运行时查看,例如通过 JDK 的jconsole。以下是可用信息的摘要。

  • Client WebSocket 会话

      • 当前
    • 表示当前有多少 client 会话,其中计数进一步按 WebSocket 与 HTTP 流式传输和轮询 SockJS 会话进行细分。
    • 表示已建立的会话总数。
  • 异常关闭

      • 连接失败
    • 这些会话已经建立,但在 60 秒内没有收到任何消息后关闭。这通常表示代理或网络问题。
  • 超出发送限制

    • 超过配置的发送超时或缓慢客户端可能发生的发送缓冲区限制后会话关闭(请参阅上一节)。
  • 运输错误

    • 在传输错误(例如无法读取或写入 WebSocket 连接或 HTTP request/response)之后会话已关闭。
  • STOMP 框架

    • 处理的 CONNECT,CONNECTED 和 DISCONNECT 帧的总数,表示 STOMP level 上连接了多少个 clients。请注意,当会话异常关闭或 clients 关闭而不发送 DISCONNECT 帧时,DISCONNECT 计数可能会更低。
  • STOMP Broker Relay

      • TCP 连接
    • 表示为 broker 建立了代表 client WebSocket 会话的 TCP 连接数。这应该等于 client WebSocket 会话的数量 1 个额外的共享“系统”连接,用于从 application 中发送消息。
  • STOMP 框架

    • 代表 clients 转发到 broker 或从 broker 接收的 CONNECT,CONNECTED 和 DISCONNECT 帧的总数。请注意,无论 client WebSocket session 如何关闭,都会向 broker 发送 DISCONNECT 帧。因此,较低的 DISCONNECT 帧计数表示 broker 是 pro-actively 关闭连接,可能是因为没有在 time 到达的心跳,无效的输入帧或其他。
  • Client Inbound Channel

    • 来自线程池的统计信息支持“clientInboundChannel”,提供对传入消息处理的运行状况的深入了解。在此排队的任务表明 application 可能太慢而无法处理消息。如果有 I/O 绑定任务(e.g. 慢数据库查询,HTTP 请求到第三方 REST API 等),请考虑增加线程池大小。
  • Client Outbound Channel

    • 支持“clientOutboundChannel”的线程池中的统计信息,提供对_bients 的 broadcasting 消息运行状况的深入了解。在这里排队的任务表明客户端太慢而无法使用消息。解决此问题的一种方法是增加线程池大小以适应预期的并发慢客户端数。另一种选择是减少发送超时和发送缓冲区大小限制(参见上一节)。
  • SockJS 任务计划程序

    • 来自 SockJS 任务调度程序的线程池的 stats,用于发送心跳。请注意,当在 STOMP level 上协商心跳时,将禁用 SockJS 心跳。

26.4.19 测试

使用 Spring 的 STOMP 而不是 WebSocket 支持测试 applications 有两种主要方法。第一种是编写 server-side 测试来验证控制器的功能及其带注释的消息处理方法。第二种是编写完整的 end-to-end 测试,涉及运行 client 和服务器。

这两种方法并不相互排斥。相反,每个人都在整体测试策略中占有一席之地。 Server-side 测试更集中,更容易编写和维护。另一方面,End-to-end integration 测试更完整,测试更多,但它们也更多地参与编写和维护。

最简单的 server-side 测试形式是编写控制器单元测试。然而,由于控制器的大部分功能取决于其注释,因此这没有用。纯单元测试根本无法测试。

理想情况下,测试中的控制器应该在运行时调用,就像测试使用 Spring MVC Test framework 处理 HTTP 请求的控制器的方法一样。 i.e。没有 running Servlet 容器,但依赖于 Spring Framework 来调用带注释的控制器。就像 Spring MVC Test 一样,有两种可能的选择,使用“context-based”或“standalone”设置:

  • 在 Spring TestContext framework,inject“clientInboundChannel”作为测试字段的帮助下加载实际的 Spring configuration,并使用它来发送要由控制器方法处理的消息。

  • 手动设置调用控制器(即SimpAnnotationMethodMessageHandler)所需的最小 Spring framework 基础结构,并将控制器的消息直接传递给它。

这两种设置方案都在测试股票投资组合 sample application 中进行了演示。

第二种方法是创建 end-to-end integration 测试。为此,您需要以嵌入模式运行 WebSocket 服务器并将其作为 WebSocket client 连接到它,发送包含 STOMP 帧的 WebSocket 消息。 测试股票投资组合 sample application 还演示了这种方法,使用 Tomcat 作为嵌入式 WebSocket 服务器,并使用简单的 STOMP client 进行测试。

Updated at: 5 months ago
25.10. Portlet application 部署Table of content27. CORS 支持