30. JMS(Java 消息服务)

30.1 简介

Spring 提供了一个 JMS integration framework,它简化了 JMS API 的使用,就像 Spring 的 integration 对 JDBC API 一样。

JMS 可以大致分为两个功能区域,即消息的生产和消费。 JmsTemplate class 用于消息 production 和同步消息接收。对于类似于 Java EE 的 message-driven bean 样式的异步接收,Spring 提供了许多用于创建 Message-Driven POJO(MDP)的消息 listener 容器。 Spring 还提供了 creating message listeners 的声明方式。

org.springframework.jms.core提供了使用 JMS 的核心功能。它包含 JMS 模板 classes,它通过处理资源的创建和释放来简化 JMS 的使用,就像JdbcTemplate对 JDBC 一样。 Spring 模板 classes 的设计原则 common 是提供辅助方法来执行 common 操作,并且为了更复杂的用法,将处理任务的本质委托给用户实现的回调接口。 JMS 模板遵循相同的设计。 classes 为发送消息提供了各种便利方法,同步消息,并向用户公开 JMS session 和 messageproducer。

org.springframework.jms.support提供JMSException转换功能。转换将已检查的JMSException层次结构转换为未检查的 exceptions 的镜像层次结构。如果已检查的javax.jms.JMSException有任何提供者特定的子类,则此 exception 将包含在未选中的UncategorizedJmsException中。

org.springframework.jms.support.converter提供MessageConverter抽象以在 Java objects 和 JMS 消息之间进行转换。

org.springframework.jms.support.destination提供了用于管理 JMS 目标的各种策略,例如为存储在 JNDI 中的目标提供服务定位器。

org.springframework.jms.annotation提供了必要的基础结构,以使用@JmsListener支持 annotation-driven listener endpoints。

org.springframework.jms.configjms名称空间提供了解析器 implementation,并为 java 配置提供了配置 listener 容器和创建 listener endpoints 的支持。

最后,包org.springframework.jms.connection提供ConnectionFactory的_impleration,适用于独立的 applications。它还包含一个 Spring 的PlatformTransactionManager实现 JMS(狡猾地命名为JmsTransactionManager)。这允许将 JMS 作为 transactional 资源无缝整合到 Spring 的 transaction management 机制中。

30.2 使用 Spring JMS

30.2.1 JmsTemplate

JmsTemplate class 是 JMS 核心包中的中心 class。它简化了 JMS 的使用,因为它在发送或同步接收消息时处理资源的创建和释放。

使用JmsTemplate的 Code 只需要实现回调接口,为它们提供明确定义的高 level contract。 MessageCreator回调接口创建一条消息,该消息由JmsTemplate中的调用 code 提供Session。在 order 中允许更复杂地使用 JMS API,回调SessionCallback为用户提供 JMS session,回调ProducerCallback暴露SessionMessageProducer对。

JMS API 公开了两种类型的发送方法,一种采用传递模式,优先级和 time-to-live 作为服务质量(QOS)参数,另一种采用不使用默认值的 QOS 参数。由于JmsTemplate中有许多发送方法,因此 QOS 参数的设置已作为 bean properties 公开,以避免重复发送方法的数量。同样,使用 property setReceiveTimeout设置同步接收 calls 的超时值。

一些 JMS 提供程序允许通过ConnectionFactory的 configuration 以管理方式设置默认 QOS 值。这会导致对MessageProducer的 send 方法send(Destination destination, Message message)的调用将使用与 JMS 规范中指定的值不同的 QOS 默认值。在 order 中提供 QOS 值的一致管理,因此必须通过将 boolean property isExplicitQosEnabled设置为true来专门启用JmsTemplate以使用其自己的 QOS 值。

为方便起见,JmsTemplate还公开了一个基本的 request-reply 操作,该操作允许发送消息并等待作为操作的一部分创建的临时队列的回复。

一旦配置,JmsTemplate class 的实例就是 thread-safe。这很重要,因为这意味着您可以配置JmsTemplate的单个实例,然后安全地将此共享 reference 注入多个协作者。要清楚,JmsTemplate是有状态的,因为它维护的 reference,但是 state 不是会话 state。

从 Spring Framework 4.1 开始,JmsMessagingTemplate构建在JmsTemplate之上,并提供与消息抽象的 integration。 org.springframework.messaging.Message。这允许您创建要以通用方式发送的消息。

30.2.2 连接

JmsTemplate需要参考ConnectionFactoryConnectionFactory是 JMS 规范的一部分,并作为使用 JMS 的入口点。它由 client application 用作工厂来创建与 JMS 提供程序的连接,并封装各种 configuration 参数,其中许多参数是特定于供应商的,例如 SSL configuration 选项。

在 EJB 中使用 JMS 时,供应商提供 JMS 接口的 implementations,以便它们可以参与声明式 transaction management 并执行连接和会话池。在 order 中使用此 implementation,Java EE 容器通常要求您在 EJB 或 servlet 部署描述符中将 JMS 连接工厂声明为resource-ref。为了确保在 EJB 内部使用JmsTemplate这些 features,client application 应该确保它__seferences ConnectionFactory的托管 implementation。

缓存消息传递资源

标准 API 涉及创建许多中间 objects。要发送消息,请执行以下“API”步骤

ConnectionFactory->Connection->Session->MessageProducer->send

在 ConnectionFactory 和 Send 操作之间,有三个中间 objects 被创建和销毁。为了优化资源使用并增加 performance,提供了ConnectionFactory的两个 implementations。

SingleConnectionFactory

Spring 提供ConnectionFactory接口的_imple 实现SingleConnectionFactory,它将在所有createConnection() calls 上返回相同的Connection,并忽略 calls 到close()。这对于测试和独立环境非常有用,因此可以将相同的连接用于多个可以 span 任意数量的 transactions 的JmsTemplate calls。 SingleConnectionFactory对标准ConnectionFactory进行 reference,通常来自 JNDI。

CachingConnectionFactory

CachingConnectionFactory扩展了SingleConnectionFactory的功能,并添加了 Sessions,MessageProducers 和 MessageConsumers 的缓存。初始高速缓存大小设置为 1,使用 property sessionCacheSize增加高速缓存的会话数。请注意,当会话根据其确认模式进行缓存时,实际缓存会话的数量将大于该数量,因此当sessionCacheSize设置为 1 时,最多可以有 4 个缓存的 session 实例,每个确认模式一个。 MessageProducers 和 MessageConsumers 缓存在他们自己的 session 中,并在缓存时考虑生产者和消费者的唯一 properties。 MessageProducers 根据其目标进行缓存。 MessageConsumers 基于由目标,选择器,noLocal delivery flag 和持久订阅 name(如果 creating 持久消费者)组成的 key 进行缓存。

30.2.3 目的地管理

目标,如 ConnectionFactories,是 JMS 管理的 objects,可以在 JNDI 中存储和检索。配置 Spring application context 时,可以使用 JNDI 工厂 class JndiObjectFactoryBean/<jee:jndi-lookup>对 object 的 references 到 JMS 目标执行依赖项注入。但是,如果 application 中存在大量目标,或者存在 JMS 提供程序特有的高级目标 management features,则此策略通常很麻烦。此类高级目标 management 的示例将是创建动态目标或支持目标的分层命名空间。 JmsTemplate将目标 name 的解析委托给 JMS 目标 object,以实现接口DestinationResolver的 implementation。 DynamicDestinationResolverJmsTemplate使用的默认 implementation,适用于解析动态目标。还提供了JndiDestinationResolver,它充当 JNDI 中包含的目标的服务定位器,并且可选地回退到DynamicDestinationResolver中包含的行为。

通常,JMS application 中使用的目标仅在运行时已知,因此在部署 application 时无法通过管理方式创建。这通常是因为根据 well-known 命名约定在运行时创建目标的交互系统组件之间存在共享的 application 逻辑。尽管动态目标的创建不是 JMS 规范的一部分,但大多数供应商都提供了此功能。使用用户定义的 name 创建动态目标,这些目标将它们与临时目标区分开来,并且通常不在 JNDI 中注册。用于创建动态目标的 API 因提供商而异,因为与目标关联的 properties 是特定于供应商的。但是,供应商有时会做出一个简单的 implementation 选择,即忽略 JMS 规范中的警告,并使用TopicSession方法createTopic(String topicName)QueueSession方法createQueue(String queueName)创建具有默认目标 properties 的新目标。根据供应商 implementation,DynamicDestinationResolver可能还会创建物理目标,而不是仅解析一个。

boolean property pubSubDomain用于配置JmsTemplate,了解正在使用的 JMS 域。默认情况下,此 property 的 value 为 false,表示将使用 point-to-point 域 Queues。 JmsTemplate使用的 property 通过DestinationResolver接口的 implementations 确定动态目标解析的行为。

您还可以通过 property defaultDestinationJmsTemplate配置为默认目标。默认目标将用于不引用特定目标的发送和接收操作。

30.2.4 消息 Listener 容器

EJB 世界中 JMS 消息最常见的用途之一是驱动 message-driven beans(MDB)。 Spring 提供了一种解决方案,可以以不将用户绑定到 EJB 容器的方式创建 message-driven POJO(MDP)。 (有关 Spring 的 MDP 的详细报道,请参阅第 30.4.2 节,“异步接收:Message-Driven POJOs” support.)从 Spring Framework 4.1 开始,可以使用@JmsListener简单地注释端点方法,有关详细信息,请参阅第 30.6 节,“Annotation-driven listener endpoints”

消息 listener 容器用于从 JMS 消息队列接收消息并驱动注入其中的MessageListener。 listener 容器负责消息接收的所有 threading 并将其分派到 listener 进行处理。消息 listener 容器是 MDP 和消息传递提供程序之间的中介,负责注册接收消息,参与 transactions,资源获取和释放,exception 转换等。这允许您作为 application 开发人员编写与接收消息(并可能响应消息)相关联的(可能是复杂的)业务逻辑,并将样板 JMS 基础结构关注委托给 framework。

有两个标准的 JMS 消息 listener 容器与 Spring 一起打包,每个容器都有其专门的 feature 集。

SimpleMessageListenerContainer

此消息 listener 容器是两种标准风格中较简单的一种。它在启动时创建固定数量的 JMS 会话和使用者,使用标准 JMS MessageConsumer.setMessageListener()方法注册 listener,并将其保留在 JMS 提供程序中以执行 listener 回调。此变体不允许动态调整运行时需求或参与外部管理的 transactions。 Compatibility-wise,它非常接近独立 JMS 规范的精神 - 但通常与 Java EE 的 JMS 限制不兼容。

虽然SimpleMessageListenerContainer不允许参与外部管理的 transactions,但它确实支持本机 JMS transactions:只需将'sessionTransacted'flag 切换为'true',或者在命名空间中将'acknowledge'属性设置为'transacted':抛出 Exceptions 从你的 listener 将导致回滚,然后重新传递消息。或者,考虑使用'CLIENT_ACKNOWLEDGE'模式,该模式在 exception 的情况下也提供重新传递,但不使用事务处理的 Sessions,因此在 transaction 协议中不包括任何其他 Session 操作(例如发送响应消息)。

默认的'AUTO_ACKNOWLEDGE'模式不提供适当的可靠性保证. listener 执行失败时消息可能会丢失(因为提供程序将在 listener 调用后自动确认每条消息,没有 exceptions 传播给提供者)或者 listener 容器关闭(这可以通过'acceptMessagesWhileStopping'flag 配置)。确保在可靠性需求的情况下使用事务处理会话 e.g. 用于可靠的队列处理和持久的 topic 订阅。

DefaultMessageListenerContainer

此消息 listener 容器是大多数情况下使用的容器。与SimpleMessageListenerContainer相比,此容器变体允许动态适应运行时需求,并且能够参与外部管理的 transactions。当配置JtaTransactionManager时,每个收到的消息都使用 XA transaction 注册;因此处理可以利用 XA transaction 语义。这个 listener 容器在 JMS 提供程序的低要求,参与外部管理的 transactions 等高级功能以及与 Java EE 环境的兼容性之间取得了良好的平衡。

可以自定义容器的 cache level。请注意,如果未启用缓存,则会为每个邮件接收创建新连接和新的 session。将此与高负载的非持久订阅相结合可能会导致邮件丢失。在这种情况下,请确保使用正确的缓存 level。

当 broker 关闭时,此容器还具有可恢复的功能。默认情况下,简单的BackOff implementation 每 5 秒重试一次。可以为更多 fine-grained 恢复选项指定自定义BackOff implementation,有关 example,请参阅ExponentialBackOff

与其兄弟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer支持本机 JMS transactions,也允许自定义确认模式。如果对于您的方案可行,强烈建议使用外部管理的 transactions:即,如果 JVM 死亡,您可以偶尔使用重复的消息。业务逻辑中的自定义重复消息检测步骤可能涵盖此类情况,e.g. 以业务实体存在检查或协议 table 检查的形式。任何此类安排都将比替代方案更有效:使用 XA transaction(通过使用JtaTransactionManager配置DefaultMessageListenerContainer)来包装整个处理,覆盖 JMS 消息的接收以及消息 listener 中业务逻辑的执行(包括数据库操作等)。

默认的'AUTO_ACKNOWLEDGE'模式不提供适当的可靠性保证. listener 执行失败时(由于提供程序将在 listener 调用之前自动确认每条消息)或 listener 容器关闭时(可能已配置),消息可能会丢失通过'acceptMessagesWhileStopping'flag)。确保在可靠性需求的情况下使用事务处理会话 e.g. 用于可靠的队列处理和持久的 topic 订阅。

30.2.5 Transaction management

Spring 提供JmsTransactionManager来管理单个 JMS ConnectionFactory的 transactions。这允许 JMS 应用程序利用 Spring 的托管 transaction features,如第 17 章,交易管理中所述。 JmsTransactionManager执行本地资源 transactions,_绑定从指定的ConnectionFactory到线程的 JMS Connection/Session 对。 JmsTemplate自动检测此类 transactional 资源并相应地对它们进行操作。

在 Java EE 环境中,ConnectionFactory将汇集 Connections 和 Sessions,因此这些资源可以跨 transactions 有效地重用。在独立环境中,使用 Spring 的SingleConnectionFactory将导致共享 JMS Connection,每个 transaction 都有自己独立的Session。或者,考虑使用 provider-specific 池绑定适配器,例如 ActiveMQ 的PooledConnectionFactory class。

JmsTemplate也可以与JtaTransactionManager和 XA-capable JMS ConnectionFactory一起用于执行分布式 transactions。请注意,这需要使用 JTA transaction manager 以及正确的 XA-configured ConnectionFactory! (检查 Java EE 服务器的/ JMS 提供程序的 documentation.)

使用 JMS API 从Connection创建Session时,在托管和非托管 transactional 环境中重用 code 可能会造成混淆。这是因为 JMS API 只有一个工厂方法来创建Session,它需要 transaction 和 acknowledgement 模式的值。在托管环境中,设置这些值是环境的 transactional 基础结构的责任,因此供应商的 wrapper 会忽略这些值到 JMS 连接。在非托管环境中使用JmsTemplate时,可以通过使用 properties sessionTransactedsessionAcknowledgeMode指定这些值。当使用PlatformTransactionManagerJmsTemplate时,模板将始终被赋予 transactional JMS Session

30.3 发送消息

JmsTemplate包含许多发送消息的便捷方法。有一些发送方法使用javax.jms.Destination object 指定目标,并使用 string 指定目标,以便在 JNDI 查找中使用。不带目标参数的 send 方法使用默认目标。

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;

public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    private Queue queue;

    public void setConnectionFactory(ConnectionFactory cf) {
        this.jmsTemplate = new JmsTemplate(cf);
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public void simpleSend() {
        this.jmsTemplate.send(this.queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("hello queue world");
            }
        });
    }
}

此 example 使用MessageCreator回调从提供的Session object 创建文本消息。通过将 reference 传递给ConnectionFactory来构造JmsTemplate。作为替代,提供零参数构造函数和connectionFactory,并且可以用于以 JavaBean 样式构造实例(使用 BeanFactory 或普通 Java code)。或者,考虑从 Spring 的JmsGatewaySupport convenience base class 派生,它为 JMS configuration 提供 pre-built bean properties。

方法send(String destinationName, MessageCreator creator)允许您使用目标的 string name 发送消息。如果这些名称在 JNDI 中注册,则应将模板的destinationResolver property 设置为JndiDestinationResolver的实例。

如果您创建了JmsTemplate并指定了默认目标,则send(MessageCreator c)会向该目标发送一条消息。

30.3.1 使用消息转换器

在 order 以便于发送域 model objects,JmsTemplate有各种发送方法,它们将 Java object 作为消息数据内容的参数。 JmsTemplate中的重载方法convertAndSend()receiveAndConvert()将转换 process 委托给MessageConverter接口的实例。此接口定义了一个简单的 contract,用于在 Java objects 和 JMS 消息之间进行转换。默认的 implementation SimpleMessageConverter支持StringTextMessagebyte[]BytesMesssage以及java.util.MapMapMessage之间的转换。通过使用转换器,您和您的 application code 可以专注于通过 JMS 发送或接收的业务 object,而不关心它如何表示为 JMS 消息的细节。

沙箱当前包含MapMessageConverter,它使用反射在 JavaBean 和MapMessage之间进行转换。您可能自己实现的其他流行的 implementation 选择是转换器,它们使用现有的 XML 编组软件包(如 JAXB,Castor,XMLBeans 或 XStream)来创建表示 object 的TextMessage

为了适应无法一般封装在转换器 class 中的消息的 properties,headers 和 body 的设置,MessagePostProcessor接口允许您在转换后但在发送之前访问该消息。下面的 example 演示了如何在java.util.Map转换为消息后修改消息头和 property。

public void sendWithConversion() {
    Map map = new HashMap();
    map.put("Name", "Mark");
    map.put("Age", new Integer(47));
    jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws JMSException {
            message.setIntProperty("AccountID", 1234);
            message.setJMSCorrelationID("123-00001");
            return message;
        }
    });
}

这会产生以下形式的消息:

MapMessage={
	Header={
		... standard headers ...
		CorrelationID={123-00001}
	}
	Properties={
		AccountID={Integer:1234}
	}
	Fields={
		Name={String:Mark}
		Age={Integer:47}
	}
}

30.3.2 SessionCallback 和 ProducerCallback

虽然发送操作涵盖了许多 common 使用场景,但有时您希望在 JMS SessionMessageProducer上执行多个操作。 SessionCallbackProducerCallback分别公开 JMS SessionSession/MessageProducer对。 JmsTemplate上的execute()方法执行这些回调方法。

30.4 接收消息

30.4.1 同步接收

虽然 JMS 通常与异步处理相关联,但可以同步使用消息。重载的receive(..)方法提供此功能。在同步接收期间,调用线程将阻塞,直到消息可用。这可能是一个危险的操作,因为调用线程可能会被无限期地阻塞。 property receiveTimeout指定接收方在放弃等待消息之前应该等待多长时间。

30.4.2 异步接收:Message-Driven POJO

Spring 还通过使用@JmsListener annotation 支持 annotated-listener endpoints,并提供了一个开放的基础结构来以编程方式注册 endpoints。这是设置异步接收器最方便的方法,有关详细信息,请参阅Section 30.6.1,“启用 listener endpoint annotations”

在 EJB 世界中类似于 Message-Driven Bean(MDB)的方式中,Message-Driven POJO(MDP)充当 JMS 消息的接收者。 MDP 上的一个限制(但也参见下面对MessageListenerAdapter class 的讨论)是它必须实现javax.jms.MessageListener接口。另请注意,如果您的 POJO 将在多个线程上接收消息,请务必确保 implementation 为 thread-safe。

下面是一个简单的 MDP 实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

一旦实现了MessageListener,就可以 time 来创建一个消息 listener 容器。

在下面找到如何定义和配置 Spring 附带的消息 listener 容器之一的示例(在本例中为DefaultMessageListenerContainer)。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

请参阅各种消息 listener 容器的 Spring javadocs,以获取每个 implementation 支持的 features 的完整描述。

30.4.3 SessionAwareMessageListener 接口

SessionAwareMessageListener接口是一个 Spring-specific 接口,它为 JMS MessageListener接口提供类似的 contract,但也提供了消息处理方法,可以访问从中接收Message的 JMS Session

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;
}

如果您希望 MDP 能够响应任何收到的消息(使用onMessage(Message, Session)方法中提供的Session),您可以选择让 MDP 实现此接口(优先于标准 JMS MessageListener接口)。 Spring 附带的所有消息 listener 容器 implementations 都支持实现MessageListenerSessionAwareMessageListener接口的 MDP。 实现SessionAwareMessageListener的类带有警告,然后通过接口将它们绑定到 Spring。是否使用它的选择完全取决于您作为 application 开发人员或架构师。

请注意SessionAwareMessageListener接口的'onMessage(..)'方法抛出JMSException。与标准 JMS MessageListener接口相比,使用SessionAwareMessageListener接口时,client code 负责处理抛出的任何 exceptions。

30.4.4 MessageListenerAdapter

MessageListenerAdapter class 是 Spring 异步消息支持中的最终 component:简而言之,它允许您将几乎任何 class 暴露为 MDP(当然有一些约束)。

请考虑以下接口定义。请注意,虽然接口既不扩展MessageListener也不扩展SessionAwareMessageListener接口,但它仍然可以通过使用MessageListenerAdapter class 用作 MDP。还要注意各种消息处理方法如何根据它们可以接收和处理的各种Message类型的内容进行强类型化。

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

特别要注意MessageDelegate接口的上述 implementation(上面的DefaultMessageDelegate class)根本没有 JMS 依赖关系。它确实是一个 POJO,我们将通过以下 configuration 进入 MDP。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

下面是另一个 MDP 的示例,它只能处理 JMS TextMessage消息的接收。请注意消息处理方法实际上是如何调用'receive'(MessageListenerAdapter中的消息处理方法的 name 默认为'handleMessage'),但它是可配置的(如下所示)。另请注意'receive(..)'方法如何强类型化以接收和响应 JMS TextMessage消息。

public interface TextMessageDelegate {

    void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

服务员MessageListenerAdapter的 configuration 看起来像这样:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

请注意,如果上面的'messageListener'收到TextMessage以外的类型的 JMS Message,则会抛出IllegalStateException(并随后吞下)。 MessageListenerAdapter class 的另一个功能是,如果处理程序方法返回 non-void value,则能够自动发回响应Message。考虑接口和 class:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果上面的DefaultResponsiveTextMessageDelegateMessageListenerAdapter一起使用,那么从执行'receive(..)'方法返回的任何 non-null value 将(在默认的 configuration 中)转换为TextMessage。结果TextMessage将被发送到原始Message的 JMS Reply-To property 中定义的Destination(如果存在),或MessageListenerAdapter上的默认Destination(如果已配置);如果没有找到Destination,那么将抛出InvalidDestinationException(请注意,这个 exception 不会被吞下并将向上传播调用堆栈)。

30.4.5 处理 transactions 中的消息

在 transaction 中调用消息 listener 只需要重新配置 listener 容器。

可以通过 listener 容器定义上的sessionTransacted flag 简单地激活本地资源 transactions。然后,每个消息 listener 调用将在 active JMS transaction 中运行,并在 listener 执行失败的情况下回滚消息。发送响应消息(通过SessionAwareMessageListener)将是同一本地 transaction 的一部分,但任何其他资源操作(如数据库访问)将独立运行。这通常需要 listener implementation 中的重复消息检测,涵盖数据库处理已提交但消息处理无法提交的情况。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

要参与外部管理的 transaction,您需要配置 transaction manager 并使用支持外部管理 transactions 的 listener 容器:通常为DefaultMessageListenerContainer

要为 XA transaction 参与配置消息 listener 容器,您需要配置JtaTransactionManager(默认情况下,委托给 Java EE 服务器的 transaction 子系统)。请注意,底层 JMS ConnectionFactory 需要 XA-capable 并正确注册到您的 JTA transaction 协调器! (检查您的 Java EE 服务器的 JNDI 的 configuration resources.)这允许消息接收以及 e.g. 数据库访问是同一 transaction 的一部分(具有统一提交语义,代价是 XA transaction log 开销)。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后你只需要将它添加到我们早期的容器 configuration 中。容器将照顾 rest。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>

30.5 支持 JCA 消息 Endpoints

从 version 2.5 开始,Spring 还为 JCA-based MessageListener容器提供支持。 JmsMessageEndpointManager将尝试从提供程序的ResourceAdapter class name 自动确定ActivationSpec class name。因此,通常可以只提供 Spring 的通用JmsActivationSpecConfig,如下面的示例所示。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecConfig">
        <bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
            <property name="destinationName" value="myQueue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

或者,您可以使用给定的ActivationSpec object 设置JmsMessageEndpointManagerActivationSpec object 也可能来自 JNDI 查找(使用<jee:jndi-lookup>)。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpec">
        <bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
            <property name="destination" value="myQueue"/>
            <property name="destinationType" value="javax.jms.Queue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

使用 Spring 的ResourceAdapterFactoryBean,可以在本地配置目标ResourceAdapter,如下面的 example 所示。

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
    <property name="resourceAdapter">
        <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
            <property name="serverUrl" value="tcp://localhost:61616"/>
        </bean>
    </property>
    <property name="workManager">
        <bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
    </property>
</bean>

指定的WorkManager也可能指向 environment-specific 线程池 - 通常通过SimpleTaskWorkManager's“asyncTaskExecutor”property。如果碰巧使用多个适配器,请考虑为所有ResourceAdapter实例定义共享线程池。

在某些环境(e.g. WebLogic 9 或更高版本)中,可以从 JNDI 获取整个ResourceAdapter object(使用<jee:jndi-lookup>)。然后 Spring-based 消息 listeners 可以与 server-hosted ResourceAdapter交互,也可以使用服务器的 built-in WorkManager

有关更多详细信息,请参阅 javadoc 以获取JmsMessageEndpointManagerJmsActivationSpecConfigResourceAdapterFactoryBean

Spring 还提供了一个通用的 JCA 消息端点 manager,它与 JMS 无关:org.springframework.jca.endpoint.GenericMessageEndpointManager。此 component 允许使用任何消息 listener 类型(e.g. 一个 CCI MessageListener)和任何 provider-specific ActivationSpec object。查看您的 JCA 提供商的文档以了解连接器的实际功能,并参考GenericMessageEndpointManager的 javadoc 获取 Spring-specific configuration 详细信息。

JCA-based 消息端点 management 与 EJB 2.1 Message-Driven Beans 非常类似;它使用相同的底层资源提供程序 contract。与 EJB 2.1 MDB 一样,JCA 提供程序支持的任何消息 listener 接口也可以在 Spring context 中使用。尽管如此,Spring 仍然为 JMS 提供了明确的“便利”支持,因为 JMS 是与 JCA 端点 management contract 一起使用的最常见的端点 API。

30.6 Annotation-driven listener endpoints

异步接收消息的最简单方法是使用带注释的 listener 端点基础结构。简而言之,它允许您将托管 bean 的方法公开为 JMS listener 端点。

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(String data) { ... }
}

上面 example 的 idea 是每当javax.jms.Destination“myDestination”上有消息时,相应地调用processOrder方法(在这种情况下,JMS 消息的内容与MessageListenerAdapter提供的内容类似)。

带注释的端点基础结构使用JmsListenerContainerFactory为每个带注释的方法在后台创建消息 listener 容器。这样的容器没有针对 application context 注册,但可以使用JmsListenerEndpointRegistry bean 轻松定位以便管理。

@JmsListener是 Java 8 上的可重复 annotation,因此可以通过向其添加额外的@JmsListener声明将多个 JMS 目标关联到同一方法。在 Java 6 和 7 上,您可以使用@JmsListeners annotation。

30.6.1 启用 listener endpoint annotations

要启用对@JmsListener 注释的支持,请将@EnableJms添加到@Configuration classes 中的一个。

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setSessionTransacted(true);
        factory.setConcurrency("3-10");
        return factory;
    }
}

默认情况下,基础结构会查找名为jmsListenerContainerFactory的 bean 作为工厂用于创建消息 listener 容器的源。在这种情况下,忽略 JMS 基础结构设置,可以调用processOrder方法,核心轮询大小为 3 个线程,最大池大小为 10 个线程。

可以自定义 listener 容器工厂以使用每个 annotation,也可以通过实现JmsListenerConfigurer接口来配置显式默认值。只有在没有特定容器工厂的情况下注册至少一个端点时,才需要默认值。有关完整详细信息和示例,请参阅 javadoc。

如果您更喜欢XML configuration,请使用<jms:annotation-driven>元素。

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
        class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="sessionTransacted" value="true"/>
    <property name="concurrency" value="3-10"/>
</bean>

30.6.2 程序化端点注册

JmsListenerEndpoint提供 JMS 端点的 model,负责为该 model 配置容器。除了JmsListener annotation 检测到的那些之外,基础结构还允许您以编程方式配置 endpoints。

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint");
        endpoint.setDestination("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

在上面的 example 中,我们使用了SimpleJmsListenerEndpoint来提供实际的MessageListener来调用,但你也可以构建自己的端点变量来描述自定义调用机制。

应该注意的是,您也可以完全跳过@JmsListener的使用,并且只能通过JmsListenerConfigurer以编程方式注册 endpoints。

30.6.3 带注释的端点方法签名

到目前为止,我们在端点中注入了一个简单的String,但它实际上可以有一个非常灵活的方法签名。让我们使用自定义标题_注入Order

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

这些是您可以在 JMS listener endpoints 中注入的主要元素:

  • 原始javax.jms.Message或其任何子类(当然提供它与传入消息类型匹配)。

  • javax.jms.Session用于对本机 JMS API 的可选访问 e.g. 用于发送自定义回复。

  • org.springframework.messaging.Message表示传入的 JMS 消息。请注意,此消息包含自定义和标准 headers(由JmsHeaders定义)。

  • @Header -annotated 方法 arguments 提取特定的头 value,包括标准的 JMS headers。

  • @Headers -annotated 参数也必须可分配给java.util.Map才能访问所有 headers。

  • non-annotated 元素不是受支持的类型之一(i.e.MessageSession)被视为有效负载。您可以通过使用@Payload注释参数来使其明确。您还可以通过添加额外的@Valid来启用验证。

inject Spring Message抽象的能力对于从 transport-specific 消息中存储的所有信息中受益特别有用,而不依赖于 transport-specific API。

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

方法 arguments 的处理由DefaultMessageHandlerMethodFactory提供,可以进一步自定义以支持其他方法 arguments。转换和验证支持也可以在那里定制。

例如,如果我们想在处理它之前确保我们的Order有效,我们可以使用@Valid注释有效负载并配置必要的验证器,如下所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}

30.6.4 响应管理

MessageListenerAdapter中的现有支持已经允许您的方法具有非void return 类型。在这种情况下,调用的结果封装在javax.jms.Message中,该javax.jms.Message在原始消息的JMSReplyTo标头中指定的目标中或在 listener 上配置的默认目标中发送。现在可以使用消息传递抽象的@SendTo annotation 来设置该默认目标。

假设我们的processOrder方法现在应该_ret ,可以按如下方式编写它以自动发送响应:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

如果您有多个@JmsListener -annotated 方法,还可以将@SendTo annotation 放在 class level 上以共享默认回复目标。

如果你需要以 transport-independent 方式设置其他 headers,你可以_ret _ a __,例如:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
}

如果需要在运行时计算响应目标,可以将响应封装在JmsResponse实例中,该实例还提供在运行时使用的目标。上一个 example 可以重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
    // order processing
    Message<OrderStatus> response = MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
    return JmsResponse.forQueue(response, "status");
}

30.7 JMS 命名空间支持

Spring 提供了一个 XML 命名空间,用于简化 JMS configuration。要使用 JMS 命名空间元素,您需要 reference JMS schema:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- bean definitions here -->

</beans>

命名空间由三个 top-level 元素组成:<annotation-driven/><listener-container/><jca-listener-container/><annotation-driven允许使用annotation-driven listener endpoints<listener-container/><jca-listener-container/>定义了共享 listener 容器 configuration,可能包含<listener/> child 元素。这是两个 listeners 的基本 configuration 的示例。

<jms:listener-container>

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

上面的 example 等同于_创建两个不同的 listener 容器 bean 定义和两个不同的MessageListenerAdapter bean 定义,如第 30.4.4 节,“MessageListenerAdapter”中所示。除了上面显示的属性之外,listener元素可能包含几个可选的元素。以下 table 描述了所有可用属性:

表格 1_.JMS <listener>元素的属性

属性描述
ID托管 listener 容器的 bean name。如果未指定,将自动生成 bean name。
目的地(必填)此 listener 的目标 name,通过DestinationResolver策略解析。
ref(必填)处理程序 object 的 bean name。
方法要调用的处理程序方法的 name。如果ref指向MessageListener或 Spring SessionAwareMessageListener,则可以省略此属性。
response-destination要将响应消息发送到的默认响应目标的 name。这将在请求消息不带有“JMSReplyTo”字段的情况下应用。此目标的类型将由 listener-container 的“response-destination-type”属性确定。注意:这仅适用于具有 return value 的 listener 方法,每个结果 object 都将转换为响应消息。
订阅持久订阅的 name(如果有)。
选择此 listener 的可选消息选择器。
并发此 listener 启动的并发 sessions/consumers 数。可以是表示最大数字的简单数字(e.g. “5”),也可以是表示下限和上限的范围(e.g. “3-5”)。请注意,指定的最小值只是一个提示,可能在运行时被忽略。默认值是容器提供的 value

<listener-container/>元素也接受几个可选属性。这允许自定义各种策略(例如_示例,taskExecutordestinationResolver)以及基本 JMS 设置和资源 references。使用这些属性,可以定义 highly-customized listener 容器,同时仍然可以从命名空间的便利性中受益。

通过指定要通过factory-id属性公开的 bean 的 id,可以将此类设置自动公开为JmsListenerContainerFactory

<jms:listener-container connection-factory="myConnectionFactory"
        task-executor="myTaskExecutor"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

以下 table 描述了所有可用属性。有关各个 properties 的更多详细信息,请参阅AbstractMessageListenerContainer的 class-level javadoc 及其具体子类。 javadoc 还提供了 transaction 选择和消息重新传递方案的讨论。

表格 1_.JMS <listener-container>元素的属性

属性描述
container-type这个 listener 容器的类型。可用选项包括:defaultsimpledefault102simple102(默认 value 为'default')。
container-class自定义 listener 容器 implementation class 作为完全限定的 class name。根据“container-type”属性,默认为 Spring 的标准DefaultMessageListenerContainerSimpleMessageListenerContainer
factory-id将此元素定义的设置公开为具有指定标识的JmsListenerContainerFactory,以便它们可以与其他 endpoints 一起使用。
connection-factory__ JMS ConnectionFactory bean 的引用(默认 bean name 是'connectionFactory')。
task-executorJMS listener 调用者的 Spring TaskExecutor的引用。
destination-resolver策略的参考用于解析 JMS Destinations
message-converter用于将 JMS 消息转换为 listener 方法 arguments 的MessageConverter策略的 reference。默认是SimpleMessageConverter
error-handler策略用于处理在执行MessageListener期间可能发生的任何未捕获的 Exceptions。
destination-type此 listener 的 JMS 目标类型:queuetopicdurableTopicsharedTopicsharedDurableTopic。这可能使容器的pubSubDomainsubscriptionDurablesubscriptionSharedproperties 成为可能。默认值为queue(i.e.禁用这 3 个 properties)。
response-destination-type响应的 JMS 目标类型:“queue”,“ topic”。默认值是“destination-type”属性的 value。
client-id此 listener 容器的 JMS client 标识。使用持久订阅时需要指定。
高速缓存JMS 资源的缓存 level:noneconnectionsessionconsumerauto。默认情况下(auto),cache level 实际上将是“consumer”,除非指定了外部 transaction manager - 在这种情况下,有效默认值为none(假设 Java EE-style transaction management,其中给定的 ConnectionFactory 是 XA-aware 池)。
确认本机 JMS 确认模式:autoclientdups-oktransacted。 的值激活本地交易的Session。或者,指定下面描述的transaction-manager属性。默认为auto
transaction-manager对外部PlatformTransactionManager的引用(通常是 XA-based transaction 协调器,e.g. Spring 的JtaTransactionManager)。如果未指定,将使用本机确认(请参阅“确认”属性)。
并发每个 listener 启动的并发 sessions/consumers 数。可以是表示最大数字的简单数字(e.g. “5”),也可以是表示下限和上限的范围(e.g. “3-5”)。请注意,指定的最小值只是一个提示,可能在运行时被忽略。默认值为 1;如果是 topic listener 或者队列 ordering 很重要,请将并发限制为 1;考虑将其提升为一般队列。
预取要加载到单个 session 中的最大消息数。请注意,提高此数字可能会导致并发消费者的饥饿!
receive-timeout用于 receive calls 的超时(以毫秒为单位)。默认值为1000 ms(1 秒); -1表示根本没有超时。
back-off指定BackOff实例以用于计算恢复尝试之间的间隔。如果BackOffExecution implementation 返回BackOffExecution#STOP,listener 容器将不再进一步尝试恢复。设置此 property 时,将忽略recovery-interval value。默认值为FixedBackOff,间隔为 5000 毫秒,即 5 秒。
recovery-interval指定恢复尝试之间的间隔(以毫秒为单位)。以指定的间隔创建FixedBackOff的便捷方式。有关更多恢复选项,请考虑指定 BackOff 实例。默认值为 5000 毫秒,即 5 秒。
此容器应开始和停止的生命周期阶段。 value 越低,此容器开始越早,它将停止越晚。默认值为Integer.MAX_VALUE,表示容器将尽可能晚地启动并尽快停止。

使用“jms”schema 支持配置 JCA-based listener 容器非常相似。

<jms:jca-listener-container resource-adapter="myResourceAdapter"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

以下 table 中描述了 JCA 变体的可用 configuration 选项:

表格 1_.JMS <jca-listener-container/>元素的属性

属性描述
factory-id将此元素定义的设置公开为具有指定标识的JmsListenerContainerFactory,以便它们可以与其他 endpoints 一起使用。
resource-adapter_JCA ResourceAdapter bean 的引用(默认 bean name 是'resourceAdapter')。
activation-spec-factoryJmsActivationSpecFactory的参考。默认是自动检测 JMS 提供程序及其ActivationSpec class(请参阅DefaultJmsActivationSpecFactory)
destination-resolver策略的参考用于解析 JMS Destinations
message-converter用于将 JMS 消息转换为 listener 方法 arguments 的MessageConverter策略的 reference。默认是SimpleMessageConverter
destination-type此 listener 的 JMS 目标类型:queuetopicdurableTopicsharedTopicsharedDurableTopic。这可能使容器的pubSubDomainsubscriptionDurablesubscriptionSharedproperties 成为可能。默认值为queue(i.e.禁用这 3 个 properties)。
response-destination-type响应的 JMS 目标类型:“queue”,“ topic”。默认值是“destination-type”属性的 value。
client-id此 listener 容器的 JMS client 标识。使用持久订阅时需要指定。
确认本机 JMS 确认模式:autoclientdups-oktransacted。 的值激活本地交易的Session。或者,指定下面描述的transaction-manager属性。默认为auto
transaction-manager_Spring JtaTransactionManagerjavax.transaction.TransactionManager的引用,用于为每个传入消息启动 XA transaction。如果未指定,将使用本机确认(请参阅“确认”属性)。
并发每个 listener 启动的并发 sessions/consumers 数。可以是表示最大数字的简单数字(e.g. “5”),也可以是表示下限和上限的范围(e.g. “3-5”)。请注意,指定的最小值只是一个提示,在使用 JCA listener 容器时通常会在运行时被忽略。默认值为 1。
预取要加载到单个 session 中的最大消息数。请注意,提高此数字可能会导致并发消费者的饥饿!
Updated at: 7 months ago
29.3.1. EJB 3 注入拦截器Table of content31. JMX