30. JMS(Java消息服务)

30.1 简介

Spring提供了一个JMS集成框架,它简化了JMS API的使用,就像Spring对JDBC API的集成一样。

JMS可以大致分为两个功能区域,即消息的生成和消费。 JmsTemplate 类用于消息生成和同步消息接收。对于类似于Java EE的消息驱动bean样式的异步接收,Spring提供了许多消息监听器容器,用于创建消息驱动的POJO(MDP)。 Spring还提供了一种创建消息监听器的声明方式。

org.springframework.jms.core 提供了使用JMS的核心功能。它包含JMS模板类,通过处理资源的创建和释放来简化JMS的使用,就像 JdbcTemplate 对JDBC一样。 Spring模板类常见的设计原则是提供辅助方法来执行常见操作,并且为了更复杂的用法,将处理任务的本质委托给用户实现的回调接口。 JMS模板遵循相同的设计。这些类为发送消息,同步使用消息以及向用户公开JMS会话和消息生成器提供了各种便捷方法。

org.springframework.jms.support 提供 JMSException 转换功能。转换将已检查的 JMSException 层次结构转换为未经检查的异常的镜像层次结构。如果已检查的 javax.jms.JMSException 有任何特定于提供者的子类,则此异常将包含在未选中的 UncategorizedJmsException 中。

org.springframework.jms.support.converter 提供了一个 MessageConverter 抽象,用于在Java对象和JMS消息之间进行转换。

org.springframework.jms.support.destination 提供了各种管理JMS目标的策略,例如为存储在JNDI中的目标提供服务定位器。

org.springframework.jms.annotation 提供了必要的基础结构,以使用 @JmsListener 支持注释驱动的侦听器 endpoints 。

org.springframework.jms.config 提供了 jms 名称空间的解析器实现,以及java配置支持,用于配置监听器容器和创建监听器 endpoints 。

最后,包 org.springframework.jms.connection 提供了适用于独立应用程序的 ConnectionFactory 的实现。它还包含Spring的 PlatformTransactionManager for JMS(狡猾地命名为 JmsTransactionManager )的实现。这允许将JMS作为事务资源无缝集成到Spring的事务管理机制中。

30.2 使用Spring JMS

30.2.1 JmsTemplate

JmsTemplate 类是JMS核心包中的中心类。它简化了JMS的使用,因为它在发送或同步接收消息时处理资源的创建和释放。

使用 JmsTemplate 的代码只需要实现回调接口,为它们提供明确定义的高级 Contract 。 MessageCreator 回调接口创建一条消息,给出 JmsTemplate 中调用代码提供的 Session 。为了允许更复杂地使用JMS API,callback SessionCallback 为用户提供JMS会话,回调 ProducerCallback 公开 SessionMessageProducer 对。

JMS API公开了两种类型的发送方法,一种采用传递模式,优先级和生存时间作为服务质量(QOS)参数,另一种采用不使用默认值的QOS参数。由于 JmsTemplate 中有许多发送方法,因此QOS参数的设置已作为bean属性公开,以避免重复发送方法的数量。同样,使用属性 setReceiveTimeout 设置同步接收调用的超时值。

某些JMS提供程序允许通过 ConnectionFactory 的配置以管理方式设置默认QOS值。这会导致对 MessageProducer 的send方法 send(Destination destination, Message message) 的调用将使用与JMS规范中指定的不同的QOS默认值。因此,为了提供对QOS值的一致管理,必须通过将布尔属性 isExplicitQosEnabled 设置为 true 来专门启用 JmsTemplate 以使用其自己的QOS值。

为方便起见, JmsTemplate 还公开了一个基本的请求 - 回复操作,该操作允许发送消息并等待作为操作的一部分创建的临时队列的回复。

配置后,JmsTemplate类的实例是线程安全的。这很重要,因为这意味着您可以配置JmsTemplate的单个实例,然后将此共享引用安全地注入多个协作者。要清楚,JmsTemplate是有状态的,因为它维护对ConnectionFactory的引用,但是这个状态不是会话状态。

从Spring Framework 4.1开始, JmsMessagingTemplate 构建于 JmsTemplate 之上,并提供与消息传递抽象的集成,即 org.springframework.messaging.Message 。这允许您创建要以通用方式发送的消息。

30.2.2 连接

JmsTemplate 需要引用 ConnectionFactoryConnectionFactory 是JMS规范的一部分,并作为使用JMS的入口点。客户端应用程序将其用作工厂以创建与JMS提供程序的连接,并封装各种配置参数,其中许多是供应商特定的,例如SSL配置选项。

在EJB中使用JMS时,供应商提供JMS接口的实现,以便它们可以参与声明式事务管理并执行连接和会话池。为了使用此实现,Java EE容器通常要求您在EJB或servlet部署描述符中将JMS连接工厂声明为 resource-ref 。为确保在EJB内部使用 JmsTemplate 这些功能,客户端应用程序应确保它引用 ConnectionFactory 的托管实现。

缓存消息传递资源

标准API涉及创建许多中间对象。要发送消息,请执行以下“API”步骤

ConnectionFactory->Connection->Session->MessageProducer->send

在ConnectionFactory和Send操作之间,有三个中间对象被创建和销毁。为了优化资源使用并提高性能,提供了两个 ConnectionFactory 的实现。

SingleConnectionFactory

Spring提供了 ConnectionFactory 接口 SingleConnectionFactory 的实现,它将在所有 createConnection() 调用上返回相同的 Connection 并忽略对 close() 的调用。这对于测试和独立环境非常有用,因此可以将相同的连接用于可能跨越任意数量事务的多个 JmsTemplate 调用。 SingleConnectionFactory 引用了通常来自JNDI的标准 ConnectionFactory

CachingConnectionFactory

CachingConnectionFactory 扩展了 SingleConnectionFactory 的功能,并添加了Sessions,MessageProducers和MessageConsumers的缓存。初始高速缓存大小设置为1,使用属性 sessionCacheSize 来增加高速缓存的会话数。请注意,当会话根据其确认模式进行缓存时,实际缓存会话的数量将大于该数量,因此当 sessionCacheSize 设置为1时,最多可以有4个缓存会话实例,每个确认模式一个。 MessageProducers和MessageConsumers在其拥有的会话中缓存,并在缓存时考虑 生产环境 者和使用者的独特属性。 MessageProducers根据其目标进行缓存。 MessageConsumers基于由destination,selector,noLocal delivery标志和持久订阅名称组成的键进行缓存(如果创建持久消费者)。

30.2.3 目的地管理

目标,如ConnectionFactories,是可以在JNDI中存储和检索的JMS管理对象。配置Spring应用程序上下文时,可以使用JNDI工厂类 JndiObjectFactoryBean / <jee:jndi-lookup> 对对象对JMS目标的引用执行依赖项注入。但是,如果存在大量策略,通常这种策略很麻烦应用程序中的目标数量,或者是否存在JMS提供程序特有的高级目标管理功能。这种高级目标管理的示例将是创建动态目的地或支持目的地的分层命名空间。 JmsTemplate 将目标名称的解析委托给JMS目标对象,以实现接口 DestinationResolver 的实现。 DynamicDestinationResolverJmsTemplate 使用的默认实现,适用于解析动态目标。还提供了 JndiDestinationResolver ,它充当JNDI中包含的目标的服务定位器,并且可选地回退到 DynamicDestinationResolver 中包含的行为。

通常,JMS应用程序中使用的目标仅在运行时已知,因此在部署应用程序时无法通过管理方式创建。这通常是因为在交互系统组件之间存在共享应用程序逻辑,这些组件根据众所周知的命名约定在运行时创建目标。尽管动态目标的创建不是JMS规范的一部分,但大多数供应商都提供了此功能。使用用户定义的名称创建动态目标,这些名称将它们与临时目标区分开来,并且通常不在JNDI中注册。用于创建动态目标的API因提供商而异,因为与目标关联的属性是特定于供应商的。但是,供应商有时会做出一个简单的实现选择,即忽略JMS规范中的警告,并使用 TopicSession 方法 createTopic(String topicName)QueueSession 方法 createQueue(String queueName) 创建具有默认目标属性的新目标。根据供应商的实现, DynamicDestinationResolver 也可以创建一个物理目的地而不是仅解析一个。

布尔属性 pubSubDomain 用于配置 JmsTemplate ,了解正在使用的JMS域。默认情况下,此属性的值为false,表示将使用点对点域Queues。 JmsTemplate 使用的此属性通过 DestinationResolver 接口的实现确定动态目标解析的行为。

您还可以通过属性 defaultDestinationJmsTemplate 配置为默认目标。默认目标将用于不引用特定目标的发送和接收操作。

30.2.4 消息侦听器容器

EJB世界中JMS消息的最常见用途之一是驱动消息驱动的bean(MDB)。 Spring提供了一种解决方案,用于以不将用户绑定到EJB容器的方式创建消息驱动的POJO(MDP)。 (有关Spring的MDP支持的详细介绍,请参阅 Section 30.4.2, “Asynchronous reception: Message-Driven POJOs” 。)从Spring Framework 4.1开始,可以使用 @JmsListener 简单地注释 endpoints 方法,有关详细信息,请参阅 Section 30.6, “Annotation-driven listener endpoints”

消息侦听器容器用于从JMS消息队列接收消息并驱动注入其中的 MessageListener 。侦听器容器负责消息接收的所有线程并将其分派到侦听器中进行处理。消息监听器容器是MDP和消息传递提供者之间的中介,负责注册接收消息,参与事务,资源获取和释放,异常转换等。这允许您作为应用程序开发人员编写与接收消息(并可能响应消息)相关联的(可能是复杂的)业务逻辑,并将样板JMS基础结构关注委托给框架。

Spring包含两个标准的JMS消息监听器容器,每个容器都有其专用的功能集。

SimpleMessageListenerContainer

此消息侦听器容器更简单的两种标准风格。它在启动时创建固定数量的JMS会话和使用者,使用标准JMS MessageConsumer.setMessageListener() 方法注册侦听器,并将其保留在JMS提供程序中以执行侦听器回调。此变体不允许动态适应运行时需求或参与外部管理的事务。兼容性方面,它非常接近独立JMS规范的精神 - 但通常与Java EE的JMS限制不兼容。

虽然SimpleMessageListenerContainer不允许参与外部托管事务,但它确实支持本机JMS事务:只需将'sessionTransacted'标志切换为'true',或者在命名空间中将'acknowledge'属性设置为'transacted':例外从您的侦听器抛出将导致回滚,然后重新传递消息。或者,考虑使用'CLIENT_ACKNOWLEDGE'模式,该模式在异常情况下也提供重新传递,但不使用事务处理会话,因此不包括任何其他会话操作(如发送响应消息) Transaction 协议。默认的“AUTO_ACKNOWLEDGE”模式不提供适当的可靠性保证。当侦听器执行失败时,消息可能会丢失(因为提供程序将在侦听器调用之后自动确认每条消息,没有异常传播到提供程序)或侦听器容器关闭时(可以通过'acceptMessagesWhileStopping'标志配置) 。确保在可靠性需求的情况下使用事务处理会话,例如用于可靠的队列处理和持久的主题订阅。

DefaultMessageListenerContainer

此消息侦听器容器是大多数情况下使用的容器。与 SimpleMessageListenerContainer 相反,此容器变体允许动态适应运行时需求,并且能够参与外部管理的事务。当配置 JtaTransactionManager 时,每个收到的消息都在XA事务中注册;因此处理可以利用XA事务语义。此侦听器容器在JMS提供程序的低要求,参与外部管理事务的高级功能以及与Java EE环境的兼容性之间取得了良好的 balancer 。

可以自定义容器的缓存级别。请注意,如果未启用缓存,则会为每个邮件接收创建新连接和新会话。将此与高负载的非持久订阅相结合可能会导致邮件丢失。在这种情况下,请确保使用适当的缓存级别。

当代理发生故障时,此容器还具有可恢复的功能。默认情况下,一个简单的 BackOff 实现每5秒重试一次。可以为更细粒度的恢复选项指定自定义 BackOff 实现,有关示例,请参阅 ExponentialBackOff

与其兄弟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer支持本机JMS事务,也允许自定义确认模式。如果您的方案可行,强烈建议使用外部管理的事务:即,如果JVM死亡,您可以偶尔使用重复的消息。业务逻辑中的自定义重复邮件检测步骤可能会涵盖此类情况,例如:以业务实体存在检查或协议表检查的形式。任何此类安排都将比替代方案更有效:使用XA事务包装整个处理(通过使用JtaTransactionManager配置DefaultMessageListenerContainer),覆盖JMS消息的接收以及消息侦听器中业务逻辑的执行(包括数据库操作等)。默认的“AUTO_ACKNOWLEDGE”模式不提供适当的可靠性保证。当侦听器执行失败时(因为提供程序将在侦听器调用之前自动确认每条消息)或侦听器容器关闭时(可以通过“acceptMessagesWhileStopping”标志配置),消息可能会丢失。确保在可靠性需求的情况下使用事务处理会话,例如用于可靠的队列处理和持久的主题订阅。

30.2.5 Transaction 管理

Spring提供了 JmsTransactionManager 来管理单个JMS ConnectionFactory 的事务。这允许JMS应用程序利用Spring的托管事务功能,如 Chapter 17, Transaction Management 中所述。 JmsTransactionManager 执行本地资源事务,将JMS连接/会话对从指定的 ConnectionFactory 绑定到线程。 JmsTemplate 自动检测此类事务资源并相应地对其进行操作。

在Java EE环境中, ConnectionFactory 将池连接和会话,因此这些资源可以跨事务有效地重用。在独立环境中,使用Spring的 SingleConnectionFactory 将导致共享JMS Connection ,每个事务都有自己独立的 Session 。或者,考虑使用特定于提供者的池适配器,例如ActiveMQ的 PooledConnectionFactory 类。

JmsTemplate 也可以与 JtaTransactionManager 和支持XA的JMS ConnectionFactory 一起使用,以执行分布式事务。请注意,这需要使用JTA事务管理器以及正确配置XA的ConnectionFactory! (查看Java EE服务器的/ JMS提供程序的文档。)

使用JMS API从 Connection 创建 Session 时,在托管和非托管事务环境中重用代码可能会造成混淆。这是因为JMS API只有一个工厂方法来创建 Session ,它需要事务和确认模式的值。在托管环境中,设置这些值是环境的事务基础结构的责任,因此供应商的JMS连接包装器会忽略这些值。在非托管环境中使用 JmsTemplate 时,可以通过使用属性 sessionTransactedsessionAcknowledgeMode 指定这些值。使用时 PlatformTransactionManagerJmsTemplate ,模板将始终被赋予事务性JMS Session

30.3 发送消息

JmsTemplate 包含许多发送消息的便捷方法。有一些发送方法使用 javax.jms.Destination 对象指定目标,并使用字符串指定目标,以便在JNDI查找中使用。不带目标参数的send方法使用默认目标。

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;

public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    private Queue queue;

    public void setConnectionFactory(ConnectionFactory cf) {
        this.jmsTemplate = new JmsTemplate(cf);
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public void simpleSend() {
        this.jmsTemplate.send(this.queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("hello queue world");
            }
        });
    }
}

此示例使用 MessageCreator 回调从提供的 Session 对象创建文本消息。通过将引用传递给 ConnectionFactory 来构造 JmsTemplate 。作为替代方案,提供零参数构造函数和 connectionFactory ,可用于以JavaBean样式构造实例(使用BeanFactory或纯Java代码)。或者,考虑从Spring的 JmsGatewaySupport 便利基类派生,它为JMS配置提供预构建的bean属性。

方法 send(String destinationName, MessageCreator creator) 允许您使用目标的字符串名称发送消息。如果这些名称在JNDI中注册,则应将模板的 destinationResolver 属性设置为 JndiDestinationResolver 的实例。

如果您创建了 JmsTemplate 并指定了默认目标,则 send(MessageCreator c) 会向该目标发送一条消息。

30.3.1 使用消息转换器

为了便于发送域模型对象, JmsTemplate 具有各种发送方法,这些方法将Java对象作为消息数据内容的参数。 JmsTemplate 中的重载方法 convertAndSend()receiveAndConvert() 将转换过程委托给 MessageConverter 接口的实例。此接口定义了一个简单的合约,用于在Java对象和JMS消息之间进行转换。默认实现 SimpleMessageConverter 支持 StringTextMessagebyte[]BytesMesssagejava.util.MapMapMessage 之间的转换。通过使用转换器,您和您的应用程序代码可以专注于通过JMS发送或接收的业务对象,而不关心它如何表示为JMS消息的详细信息。

沙箱当前包含 MapMessageConverter ,它使用反射在JavaBean和 MapMessage 之间进行转换。您可能自己实现的其他流行实现选择是使用现有XML编组软件包(如JAXB,Castor,XMLBeans或XStream)的转换器来创建表示对象的 TextMessage

为了适应消息的属性, Headers 和正文的设置,这些属性, Headers 和正文无法一般地封装在转换器类中, MessagePostProcessor 接口允许您在转换消息之后但在发送之前访问消息。下面的示例演示了如何在 java.util.Map 转换为消息后修改消息头和属性。

public void sendWithConversion() {
    Map map = new HashMap();
    map.put("Name", "Mark");
    map.put("Age", new Integer(47));
    jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws JMSException {
            message.setIntProperty("AccountID", 1234);
            message.setJMSCorrelationID("123-00001");
            return message;
        }
    });
}

这会产生以下形式的消息:

MapMessage={
	Header={
		... standard headers ...
		CorrelationID={123-00001}
	}
	Properties={
		AccountID={Integer:1234}
	}
	Fields={
		Name={String:Mark}
		Age={Integer:47}
	}
}

30.3.2 SessionCallback和ProducerCallback

虽然发送操作涵盖了许多常见的使用场景,但有时您希望在JMS SessionMessageProducer 上执行多个操作。 SessionCallbackProducerCallback 分别公开JMS SessionSession / MessageProducer 对。 JmsTemplate 上的 execute() 方法执行这些回调方法。

30.4 正在接收消息

30.4.1 同步接收

虽然JMS通常与异步处理相关联,但可以同步使用消息。重载的 receive(..) 方法提供此功能。在同步接收期间,调用线程将阻塞,直到消息可用。这可能是一个危险的操作,因为调用线程可能会被无限期地阻塞。属性 receiveTimeout 指定接收方在放弃等待消息之前应等待多长时间。

30.4.2 异步接收:消息驱动的POJO

Spring还通过使用@JmsListener注释来支持带注释的侦听器 endpoints ,并提供了一个以编程方式注册 endpoints 的开放式基础结构。这是设置异步接收器的最便捷方法,有关更多详细信息,请参见第30.6.1节“启用侦听器 endpoints 注释”。

以类似于EJB世界中的消息驱动Bean(MDB)的方式,消息驱动的POJO(MDP)充当JMS消息的接收器。 MDP上的一个限制(但也参见下面对 MessageListenerAdapter 类的讨论)是它必须实现 javax.jms.MessageListener 接口。另请注意,如果您的POJO将在多个线程上接收消息,请务必确保您的实现是线程安全的。

以下是MDP的简单实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

一旦实现了 MessageListener ,就可以创建一个消息监听器容器了。

下面是一个如何定义和配置Spring附带的消息监听器容器(在本例中为 DefaultMessageListenerContainer )的示例。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

请参考各种消息的Spring javadocs监听器容器,用于完整描述每个实现支持的功能。

30.4.3 SessionAwareMessageListener接口

SessionAwareMessageListener 接口是一个特定于Spring的接口,它提供与JMS MessageListener 接口类似的协议,但也提供了消息处理方法,可以访问从中接收 Message 的JMS Session

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;
}

如果您希望MDP能够响应任何收到的消息(使用 onMessage(Message, Session) 方法中提供的 Session ),您可以选择让MDP实现此接口(优先于标准JMS MessageListener 接口)。 Spring附带的所有消息侦听器容器实现都支持实现 MessageListenerSessionAwareMessageListener 接口的MDP。实现 SessionAwareMessageListener 的类带有警告,然后通过接口将它们绑定到Spring。是否使用它的选择完全取决于您作为应用程序开发人员或架构师。

请注意 SessionAwareMessageListener 接口的 'onMessage(..)' 方法抛出 JMSException 。与标准JMS MessageListener 接口相比,使用 SessionAwareMessageListener 接口时,客户端代码负责处理抛出的任何异常。

30.4.4 MessageListenerAdapter

MessageListenerAdapter 类是Spring异步消息传递支持的最后一个组件:简而言之,它允许您将几乎任何类暴露为MDP(当然有一些约束)。

请考虑以下接口定义。请注意,虽然接口既不扩展 MessageListener 也不扩展 SessionAwareMessageListener 接口,但它仍然可以通过使用 MessageListenerAdapter 类用作MDP。还要注意各种消息处理方法是如何根据它们可以接收和处理的各种类型的内容强类型化的。

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

特别要注意的是 MessageDelegate 接口(上面的 DefaultMessageDelegate 类)的上述实现根本没有JMS依赖关系。它确实是一个POJO,我们将通过以下配置进入MDP。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

下面是另一个MDP的示例,它只能处理JMS TextMessage 消息的接收。请注意消息处理方法实际上是如何调用的 'receive'MessageListenerAdapter 中的消息处理方法的名称默认为 'handleMessage' ),但它是可配置的(如下所示)。另请注意 'receive(..)' 方法如何强类型化以接收和响应JMS TextMessage 消息。

public interface TextMessageDelegate {

    void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

话务员 MessageListenerAdapter 的配置如下所示:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

请注意,如果上述 'messageListener' 收到 TextMessage 以外的类型的JMS Message ,则会抛出 IllegalStateException (并随后吞下)。 MessageListenerAdapter 类的另一个功能是,如果处理程序方法返回非void值,则能够自动发回响应 Message 。考虑接口和类:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果上面的 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 一起使用,那么从执行 'receive(..)' 方法返回的任何非空值都将(在默认配置中)转换为 TextMessage 。结果 TextMessage 将被发送到原始 Message 的JMS Reply-To属性中定义的 Destination (如果存在),或 MessageListenerAdapter 上设置的默认 Destination (如果已配置);如果没有找到 Destination ,那么将抛出 InvalidDestinationException (并且请注意,不会吞下此异常并将向上传播调用堆栈)。

30.4.5 处理 Transaction 中的消息

在事务中调用消息侦听器只需要重新配置侦听器容器。

可以通过侦听器容器定义上的 sessionTransacted 标志简单地激活本地资源事务。然后,每个消息侦听器调用将在活动的JMS事务中运行,并在侦听器执行失败的情况下回滚消息。发送响应消息(通过 SessionAwareMessageListener )将是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)将独立运行。这通常需要在侦听器实现中检测重复的消息,包括数据库处理已提交但消息处理无法提交的情况。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

要参与外部托管事务,您需要配置事务管理器并使用支持外部托管事务的侦听器容器:通常为 DefaultMessageListenerContainer

要为XA事务参与配置消息侦听器容器,您需要配置 JtaTransactionManager (默认情况下,委托给Java EE服务器的事务子系统)。请注意,底层JMS ConnectionFactory需要具备XA功能并且已正确注册到JTA事务协调器! (检查您的Java EE服务器JNDI资源的配置。)这允许消息接收以及例如数据库访问是同一事务的一部分(使用统一提交语义,代价是XA事务日志开销)。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后你只需要将它添加到我们早期的容器配置中。容器将负责其余部分。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>

30.5 支持JCA消息 endpoints

从版本2.5开始,Spring还支持基于JCA的 MessageListener 容器。 JmsMessageEndpointManager 将尝试从提供程序的 ResourceAdapter 类名称中自动确定 ActivationSpec 类名称。因此,通常可以只提供Spring的通用 JmsActivationSpecConfig ,如以下示例所示。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecConfig">
        <bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
            <property name="destinationName" value="myQueue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

或者,您可以使用给定的 ActivationSpec 对象设置 JmsMessageEndpointManagerActivationSpec 对象也可能来自JNDI查找(使用 <jee:jndi-lookup> )。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpec">
        <bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
            <property name="destination" value="myQueue"/>
            <property name="destinationType" value="javax.jms.Queue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

使用Spring的 ResourceAdapterFactoryBean ,可以在本地配置目标 ResourceAdapter ,如以下示例所示。

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
    <property name="resourceAdapter">
        <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
            <property name="serverUrl" value="tcp://localhost:61616"/>
        </bean>
    </property>
    <property name="workManager">
        <bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
    </property>
</bean>

指定的 WorkManager 也可能指向特定于环境的线程池 - 通常通过 SimpleTaskWorkManager’s "asyncTaskExecutor" 属性。如果碰巧使用多个适配器,请考虑为所有 ResourceAdapter 实例定义共享线程池。

在某些环境(例如WebLogic 9或更高版本)中,可以从JNDI获取整个 ResourceAdapter 对象(使用 <jee:jndi-lookup> )。然后,基于Spring的消息侦听器可以与服务器托管的 ResourceAdapter 交互,也可以使用服务器的内置 WorkManager

有关更多详细信息,请参阅javadoc以获取 JmsMessageEndpointManagerJmsActivationSpecConfigResourceAdapterFactoryBean

Spring还提供了一个通用的JCA消息 endpoints 管理器,它与JMS无关: org.springframework.jca.endpoint.GenericMessageEndpointManager 。此组件允许使用任何消息侦听器类型(例如,CCI MessageListener)和任何特定于提供程序的ActivationSpec对象。查看您的JCA提供商的文档以了解连接器的实际功能,并参考 GenericMessageEndpointManager 的javadoc以获取特定于Spring的配置详细信息。

基于JCA的消息 endpoints 管理非常类似于EJB 2.1消息驱动的Bean;它使用相同的底层资源提供者 Contract 。与EJB 2.1 MDB一样,JCA提供程序支持的任何消息侦听器接口也可以在Spring上下文中使用。然而,Spring为JMS提供了明确的“便利”支持,因为JMS是与JCA endpoints 管理 Contract 一起使用的最常见的 endpoints API。

30.6 注释驱动的侦听器 endpoints

异步接收消息的最简单方法是使用带注释的侦听器 endpoints 基础结构。简而言之,它允许您将托管bean的方法公开为JMS侦听器 endpoints 。

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(String data) { ... }
}

上述示例的想法是,只要 javax.jms.Destination "myDestination" 上有消息,就会相应地调用 processOrder 方法(在这种情况下,JMS消息的内容与 MessageListenerAdapter 提供的内容类似)。

带注释的 endpoints 基础结构使用 JmsListenerContainerFactory 为每个带注释的方法在后台创建消息侦听器容器。此类容器未针对应用程序上下文进行注册,但可以使用 JmsListenerEndpointRegistry bean轻松定位以进行管理。

@JmsListener是Java 8上的可重复注释,因此可以通过向其添加额外的@JmsListener声明将多个JMS目标关联到同一方法。在Java 6和7上,您可以使用@JmsListeners批注。

30.6.1 启用侦听器 endpoints 注释

要启用对 @JmsListener 注释的支持,请将 @EnableJms 添加到您的 @Configuration 类之一。

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setSessionTransacted(true);
        factory.setConcurrency("3-10");
        return factory;
    }
}

默认情况下,基础结构会查找名为 jmsListenerContainerFactory 的bean作为工厂用于创建消息侦听器容器的源。在这种情况下,忽略JMS基础结构设置,可以调用 processOrder 方法,核心轮询大小为3个线程,最大池大小为10个线程。

可以自定义侦听器容器工厂以使用每个注释,也可以通过实现 JmsListenerConfigurer 接口来配置显式默认值。只有在没有特定容器工厂的情况下注册至少一个 endpoints 时,才需要默认值。有关完整详细信息和示例,请参阅javadoc。

如果您更喜欢 XML configuration ,请使用 <jms:annotation-driven> 元素。

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
        class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="sessionTransacted" value="true"/>
    <property name="concurrency" value="3-10"/>
</bean>

30.6.2 程序化 endpoints 注册

JmsListenerEndpoint 提供了JMS endpoints 的模型,负责为该模型配置容器。除了 JmsListener 注释检测到的 endpoints 之外,基础结构还允许您以编程方式配置 endpoints 。

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint");
        endpoint.setDestination("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

在上面的示例中,我们使用了 SimpleJmsListenerEndpoint 来提供实际的 MessageListener 来调用,但您也可以构建自己的 endpoints 变体来描述自定义调用机制。

应该注意的是,您也可以完全跳过 @JmsListener 的使用通过 JmsListenerConfigurer 以编程方式注册您的 endpoints 。

30.6.3 带注释的 endpoints 方法签名

到目前为止,我们在 endpoints 中注入了一个简单的 String ,但它实际上可以有一个非常灵活的方法签名。让我们重写它以使用自定义标头注入 Order

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

这些是您可以在JMS侦听器 endpoints 中注入的主要元素:

  • 原始 javax.jms.Message 或其任何子类(当然提供它与传入消息类型匹配)。

  • javax.jms.Session 用于对本机JMS API的可选访问,例如用于发送自定义回复。

  • org.springframework.messaging.Message 表示传入的JMS消息。请注意,此消息包含自定义标头和标准标头(由 JmsHeaders 定义)。

  • @Header -annotated方法参数,用于提取特定的标头值,包括标准JMS标头。

  • @Headers -annotated参数,必须也可以分配给 java.util.Map 以获取对所有标头的访问权限。

  • 不是受支持类型之一(即 MessageSession )的非注释元素被视为有效负载。您可以通过使用 @Payload 注释参数来使其明确。您还可以通过添加额外的 @Valid 来启用验证。

注入Spring的 Message 抽象的能力对于从特定于传输的消息中存储的所有信息中受益特别有用,而不依赖于特定于传输的API。

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法参数的处理由 DefaultMessageHandlerMethodFactory 提供,可以进一步自定义以支持其他方法参数。转换和验证支持也可以在那里定制。

例如,如果我们想在处理之前确保 Order 有效,我们可以使用 @Valid 注释有效负载并配置必要的验证器,如下所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}

30.6.4 响应管理

MessageListenerAdapter 中的现有支持已经允许您的方法具有非 void 返回类型。在这种情况下,调用的结果封装在 javax.jms.Message 中,该 javax.jms.Message 在原始消息的 JMSReplyTo 标头中指定的目标中或在侦听器上配置的默认目标中发送。现在可以使用消息传递抽象的 @SendTo 注释来设置该默认目标。

假设我们的 processOrder 方法现在应该返回 OrderStatus ,可以按如下方式编写它以自动发送响应:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果您有多个@ JmsListener-annotated方法,您还可以在类级别放置@SendTo注释以共享默认回复目标。

如果您需要以与传输无关的方式设置其他标头,则可以返回 Message ,例如:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
}

如果需要在运行时计算响应目标,可以将响应封装在 JmsResponse 实例中,该实例还提供在运行时使用的目标。上一个示例可以重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
    // order processing
    Message<OrderStatus> response = MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
    return JmsResponse.forQueue(response, "status");
}

30.7 JMS命名空间支持

Spring提供了一个XML命名空间,用于简化JMS配置。要使用JMS名称空间元素,您需要引用JMS模式:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- bean definitions here -->

</beans>

命名空间由三个顶级元素组成: <annotation-driven/><listener-container/><jca-listener-container/><annotation-driven 允许使用 annotation-driven listener endpoints<listener-container/><jca-listener-container/> 定义了共享侦听器容器配置,可能包含 <listener/> 子元素。以下是两个侦听器的基本配置示例。

<jms:listener-container>

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

上面的示例等同于创建两个不同的侦听器容器bean定义和两个不同的 MessageListenerAdapter bean定义,如 Section 30.4.4, “MessageListenerAdapter” 中所示。除了上面显示的属性之外, listener 元素可能包含几个可选的元素。下表描述了所有可用属性:

Table 30.1. Attributes of the JMS element

属性描述
id托管侦听器容器的Bean名称。如果未指定,将自动生成bean名称。
destination(必需)此侦听器的目标名称,通过 DestinationResolver 策略解析。
ref(必需)处理程序对象的bean名称。
方法要调用的处理程序方法的名称。如果 ref 指向 MessageListener 或Spring SessionAwareMessageListener ,则可以省略该属性。
response-destination要向其发送响应消息的默认响应目标的名称。这将在请求消息不带有“JMSReplyTo”字段的情况下应用。此目标的类型将由侦听器容器的“response-destination-type”属性确定。注意:这仅适用于具有返回值的侦听器方法,每个结果对象都将转换为响应消息。
subscription持久订阅的名称,如果任何。
selector此侦听器的可选消息选择器。
concurrency为此侦听器启动的并发会话/使用者数。可以是表示最大数字的简单数字(例如“5”)或表示较低数字和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,可能在运行时被忽略。默认值是容器提供的值

<listener-container/> 元素也接受几个可选属性。这允许自定义各种策略(例如, taskExecutordestinationResolver )以及基本JMS设置和资源引用。使用这些属性,可以定义高度自定义的侦听器容器,同时仍然可以从命名空间的便利性中受益。

通过指定要通过 factory-id 属性公开的bean的id,可以将这些设置自动公开为 JmsListenerContainerFactory

<jms:listener-container connection-factory="myConnectionFactory"
        task-executor="myTaskExecutor"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

下表描述了所有可用属性。有关各个属性的更多详细信息,请参阅 AbstractMessageListenerContainer 的类级别javadoc及其具体子类。 javadoc还提供了有关事务选择和消息重新传递方案的讨论。

Table 30.2. Attributes of the JMS element

属性描述
container-type此侦听器容器的类型。可用选项包括: defaultsimpledefault102simple102 (默认值为 'default' )。
container-class自定义侦听器容器实现类作为完全限定的类名。根据 "container-type" 属性,默认值为Spring的标准 DefaultMessageListenerContainerSimpleMessageListenerContainer
factory-id将此元素定义的设置公开为具有指定标识的 JmsListenerContainerFactory ,以便它们可以与其他 endpoints 一起使用。
connection-factory对JMS ConnectionFactory bean的引用(默认bean名称为 'connectionFactory' )。
task-executor对JMS侦听器调用程序的Spring TaskExecutor 的引用。
destination-resolverDestinationResolver 策略的引用,用于解析JMS Destinations
message-converterMessageConverter 策略的引用,用于将JMS消息转换为侦听器方法参数。默认值为 SimpleMessageConverter
error-handlerErrorHandler 策略的引用,用于处理在执行 MessageListener 期间可能发生的任何未捕获的异常。
destination-type此侦听器的JMS目标类型: queuetopicdurableTopicsharedTopicsharedDurableTopic 。这可能会启用容器的 pubSubDomainsubscriptionDurablesubscriptionShared 属性。默认值为 queue (即禁用这3个属性)。
response-destination-type响应的JMS目标类型:“queue”,“topic”。默认值是“destination-type”属性的值。
client-id此侦听器容器的JMS客户端ID。使用持久订阅时需要指定。
cacheJMS资源的缓存级别: noneconnectionsessionconsumerauto 。默认情况下( auto ),高速缓存级别将有效地 "consumer" ,除非已经指定了外部事务管理器 - 在这种情况下,有效的缺省值将是 none (假设的Java EE式事务管理中给定的ConnectionFactory是XA感知池)。
acknowledge本机JMS确认模式: autoclientdups-oktransacted 。值 transacted 激活本地事务 Session 。或者,指定下面描述的 transaction-manager 属性。默认值为 auto
transaction-manager对外部 PlatformTransactionManager 的引用(通常是基于XA的事务协调器,例如Spring的 JtaTransactionManager )。如果未指定,将使用本机确认(请参阅 "acknowledge" 属性)。
concurrency为每个侦听器启动的并发会话/使用者数。可以是表示最大数字的简单数字(例如“5”)或表示较低数字和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,可能在运行时被忽略。默认值为1;在主题监听器或队列排序很重要的情况下,将并发性限制为1;考虑将其提升为一般队列。
prefetch要加载到单个会话中的最大消息数。请注意,提高此数字可能会导致并发消费者的饥饿!
receive-timeout用于接收呼叫的超时(以毫秒为单位)。默认值为 1000 ms(1秒); -1 表示根本没有超时。
退避指定 BackOff 实例用于计算恢复尝试之间的间隔。如果 BackOffExecution 实现返回 BackOffExecution#STOP ,则侦听器容器将不再进一步尝试恢复。设置此属性时,将忽略 recovery-interval 值。默认值为 FixedBackOff ,间隔为5000毫秒,即5秒。
recovery-interval指定恢复尝试之间的间隔(以毫秒为单位)。以指定的间隔创建 FixedBackOff 的便捷方式。有关更多恢复选项,请考虑指定BackOff实例。默认值为5000毫秒,即5秒。
phase此容器应开始和停止的生命周期阶段。该容器越早开始的值越低,它将停止越晚。默认值为 Integer.MAX_VALUE ,表示容器将尽可能晚地启动并尽快停止。

使用“jms”模式支持配置基于JCA的侦听器容器非常相似。

<jms:jca-listener-container resource-adapter="myResourceAdapter"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

下表描述了JCA变体的可用配置选项:

Table 30.3. Attributes of the JMS element

属性描述
factory-id将此元素定义的设置公开为具有指定标识的 JmsListenerContainerFactory ,以便它们可以与其他 endpoints 一起使用。
resource-adapter对JCA ResourceAdapter bean的引用(默认bean名称为 'resourceAdapter' )。
activation-spec-factoryJmsActivationSpecFactory 的引用。默认是自动检测JMS提供程序及其 ActivationSpec 类(请参阅 DefaultJmsActivationSpecFactory
destination-resolverDestinationResolver 策略的引用,用于解析JMS Destinations
message-converterMessageConverter 策略的引用,用于将JMS消息转换为侦听器方法参数。默认值为 SimpleMessageConverter
destination-type此侦听器的JMS目标类型: queuetopicdurableTopicsharedTopicsharedDurableTopic 。这可能会启用容器的 pubSubDomainsubscriptionDurablesubscriptionShared 属性。默认值为 queue (即禁用这3个属性)。
response-destination-type响应的JMS目标类型:“queue”,“topic”。默认值是“destination-type”属性的值。
client-id此侦听器容器的JMS客户端ID。使用持久订阅时需要指定。
acknowledge本机JMS确认模式: autoclientdups-oktransacted 。值 transacted 激活本地事务 Session 。作为替代方法,请指定下面描述的 transaction-manager 属性。默认值为 auto
transaction-manager对Spring JtaTransactionManagerjavax.transaction.TransactionManager 的引用,用于为每个传入消息启动XA事务。如果未指定,将使用本机确认(请参阅 "acknowledge" 属性)。
concurrency为每个侦听器启动的并发会话/使用者数。可以是表示最大数字的简单数字(例如“5”)或表示较低数字和上限的范围(例如“3-5”)。请注意,指定的最小值只是一个提示,在使用JCA侦听器容器时通常会在运行时被忽略。默认值为1.
prefetch要加载到单个会话中的最大消息数。请注意,提高此数字可能会导致并发消费者的饥饿!
Updated at: 5 months ago
29.3.1. EJB 3注入拦截器Table of content31. JMX
Comment
You are not logged in.

There are no comments.