30. JMS(Java 消息服务)

30.1 Introduction

Spring 提供了一个 JMS 集成框架,该框架简化了 JMS API 的使用,就像 Spring 对 JDBC API 的集成一样。

JMS 可以大致分为两个功能区域,即消息的产生和使用。 JmsTemplate类用于消息生成和同步消息接收。对于类似于 Java EE 消息驱动 bean 样式的异步接收,Spring 提供了许多消息侦听器容器,这些容器用于创建消息驱动 POJO(MDP)。 Spring 还提供了一种创建消息侦听器的声明方式。

软件包org.springframework.jms.core提供了使用 JMS 的核心功能。它包含 JMS 模板类,该类通过处理资源的创建和释放来简化 JMS 的使用,就像JdbcTemplate对于 JDBC 一样。 Spring 模板类共有的设计原则是提供辅助方法来执行常见操作,并且为了更复杂的使用,将处理任务的实质委托给用户实现的回调接口。 JMS 模板遵循相同的设计。这些类提供了各种便利的方法,用于发送消息,同步使用消息以及向用户公开 JMS 会话和消息生成器。

org.springframework.jms.support包提供JMSException翻译功能。转换将已检查的JMSException层次结构转换为未检查的异常的镜像层次结构。如果选中的javax.jms.JMSException有任何提供程序特定的子类,则将此异常包装在未选中的UncategorizedJmsException中。

org.springframework.jms.support.converter提供了MessageConverter抽象,以在 Java 对象和 JMS 消息之间进行转换。

org.springframework.jms.support.destination提供了用于 ManagementJMS 目的地的各种策略,例如为 JNDI 中存储的目的地提供服务定位器。

软件包org.springframework.jms.annotation提供了必要的基础结构,以支持使用@JmsListener的 Comments 驱动的侦听器端点。

org.springframework.jms.config提供了jms名称空间的解析器实现以及 java config 支持,以配置侦听器容器和创建侦听器端点。

最后,包org.springframework.jms.connection提供了适用于独立应用程序的ConnectionFactory的实现。它还包含用于 JMS 的 Spring PlatformTransactionManager的实现(巧妙地名为JmsTransactionManager)。这允许将 JMS 作为事务资源无缝集成到 Spring 的事务 Management 机制中。

30.2 使用 Spring JMS

30.2.1 JmsTemplate

JmsTemplate类是 JMS 核心软件包中的中心类。由于它在发送或同步接收消息时处理资源的创建和释放,因此它简化了 JMS 的使用。

使用JmsTemplate的代码仅需要实现回调接口,从而为它们提供明确定义的高级协定。 MessageCreator回调接口创建一个消息,该消息由JmsTemplate中的调用代码提供的Session给定。为了允许更复杂地使用 JMS API,回调SessionCallback为用户提供了 JMS 会话,回调ProducerCallback公开了SessionMessageProducer对。

JMS API 公开了两种类型的发送方法,一种采用交付模式,优先级和生存时间作为服务质量(QOS)参数,另一种不采用使用默认值的 QOS 参数。由于JmsTemplate中有许多发送方法,因此 QOS 参数的设置已公开为 bean 属性,以避免重复发送方法。同样,使用属性setReceiveTimeout设置同步接收呼叫的超时值。

某些 JMS 提供程序允许通过ConnectionFactory的配置来 Management 默认 QOS 值的设置。这样做的结果是,对MessageProducer的发送方法send(Destination destination, Message message)的调用将使用与 JMS 规范中指定的 QOS 默认值不同的 QOS 默认值。为了提供对 QOS 值的一致 Management,因此必须通过将布尔属性isExplicitQosEnabled设置为true来专门使JmsTemplate使用其自己的 QOS 值。

为方便起见,JmsTemplate还公开了一个基本的请求-答复操作,该操作允许发送消息并 await 作为该操作一部分创建的临时队列的答复。

Note

JmsTemplate类的实例一旦配置便是“线程安全的”。这很重要,因为这意味着您可以配置JmsTemplate的单个实例,然后安全地将此“共享”引用注入多个协作者。需要明确的是,JmsTemplate是有状态的,因为它保持对ConnectionFactory的引用,但是此状态为会话状态。

从 Spring Framework 4.1 开始,JmsMessagingTemplate构建在JmsTemplate的基础上,并提供了与消息传递抽象(即org.springframework.messaging.Message)的集成。这使您可以创建以通用方式发送的消息。

30.2.2 Connections

JmsTemplate要求引用ConnectionFactoryConnectionFactory是 JMS 规范的一部分,并且是使用 JMS 的入口点。Client 端应用程序使用它作为工厂来创建与 JMS 提供程序的连接,并封装各种配置参数,其中许多是特定于供应商的,例如 SSL 配置选项。

当在 EJB 中使用 JMS 时,供应商提供 JMS 接口的实现,以便它们可以参与声明式事务 Management 并执行连接和会话的池化。为了使用此实现,Java EE 容器通常要求您在 EJB 或 Servlet 部署 Descriptors 中将 JMS 连接工厂声明为resource-ref。为了确保在 EJB 内的JmsTemplate中使用这些功能,Client 端应用程序应确保引用了ConnectionFactory的托管实现。

缓存消息传递资源

标准 API 涉及创建许多中间对象。要发送消息,请执行以下“ API”遍历

ConnectionFactory->Connection->Session->MessageProducer->send

在 ConnectionFactory 和 Send 操作之间,创建和销毁了三个中间对象。为了优化资源使用并提高性能,提供了ConnectionFactory的两种实现。

SingleConnectionFactory

Spring 提供了ConnectionFactory接口SingleConnectionFactory的实现,该实现将在所有createConnection()调用中返回相同的Connection并忽略对close()的调用。这对于测试和独立环境很有用,因此同一连接可用于可能跨越任意数量事务的多个JmsTemplate调用。 SingleConnectionFactory引用了通常来自 JNDI 的标准ConnectionFactory

CachingConnectionFactory

CachingConnectionFactory扩展了SingleConnectionFactory的功能,并添加了 Session,MessageProducers 和 MessageConsumers 的缓存。初始缓存大小设置为 1,使用属性sessionCacheSize增加缓存的会话数。请注意,由于根据会话的确认模式缓存会话,因此实际缓存的会话数将大于该数量,因此当sessionCacheSize设置为 1 时,最多可以有 4 个缓存的会话实例,每种确认模式一个。 MessageProducers 和 MessageConsumers 缓存在其自己的会话中,并且在缓存时还要考虑生产者和使用者的唯一属性。 MessageProducers 将根据其目的地进行缓存。基于由目标,selectors,noLocal 传递标志和持久订阅名称(如果创建持久使用者)组成的键来缓存 MessageConsumers。

30.2.3 目标 Management

目的地,例如 ConnectionFactories,是 JMSManagement 的对象,可以在 JNDI 中进行存储和检索。在配置 Spring 应用程序上下文时,可以使用 JNDI 工厂类JndiObjectFactoryBean/<jee:jndi-lookup>对对象对 JMS 目标的引用执行依赖项注入。但是,如果应用程序中有大量目标,或者 JMS 提供程序具有独特的高级目标 Management 功能,则此策略通常很麻烦。这种高级目标 Management 的示例将是动态目标的创建或对目标的分层名称空间的支持。 JmsTemplate将目标名称的解析委派给接口DestinationResolver的实现的 JMS 目标对象。 DynamicDestinationResolverJmsTemplate使用的默认实现,并且可以解析动态目标。还提供了JndiDestinationResolver,它充当 JNDI 中包含的目的地的服务定位器,并且可以选择退回到DynamicDestinationResolver中包含的行为。

通常在 JMS 应用程序中使用的目标仅在运行时才知道,因此在部署应用程序时无法通过 Management 方式创建。这通常是因为在交互的系统组件之间存在共享的应用程序逻辑,这些组件根据已知的命名约定在运行时创建目标。即使创建动态目标不属于 JMS 规范的一部分,但大多数供应商都提供了此功能。动态目标是使用用户定义的名称创建的,该名称将它们与临时目标区分开,并且通常未在 JNDI 中注册。提供商之间使用的用于创建动态目的地的 API 有所不同,因为与目的地关联的属性是特定于供应商的。但是,供应商有时会做出一个简单的实现选择,就是忽略 JMS 规范中的警告,并使用TopicSession方法createTopic(String topicName)QueueSession方法createQueue(String queueName)来创建具有默认目标属性的新目标。根据供应商的实现,DynamicDestinationResolver然后可能还会创建一个物理目标,而不是仅解决一个物理目标。

布尔属性pubSubDomain用于在知道正在使用哪个 JMS 域的情况下配置JmsTemplate。默认情况下,此属性的值为 false,指示将使用点对点域 Queues。 JmsTemplate使用的此属性通过DestinationResolver接口的实现确定动态目标解析的行为。

您还可以通过属性defaultDestinationJmsTemplate配置为默认目标。默认目标将与不引用特定目标的发送和接收操作一起使用。

30.2.4 邮件侦听器容器

在 EJB 世界中,JMS 消息最常见的用途之一是驱动消息驱动的 bean(MDB)。 Spring 提供了一种解决方案,以不将用户绑定到 EJB 容器的方式创建消息驱动的 POJO(MDP)。 (有关 Spring 对 MDP 支持的详细介绍,请参见第 30.4.2 节“异步接收:消息驱动的 POJO”。)从 Spring Framework 4.1 开始,可以使用@JmsListenerComments 端点方法,有关更多详细信息,请参见第 30.6 节“Comments 驱动的侦听器端点”

消息侦听器容器用于从 JMS 消息队列接收消息,并驱动注入到其中的MessageListener。侦听器容器负责消息接收的所有线程,并分派到侦听器中进行处理。消息侦听器容器是 MDP 与消息传递提供程序之间的中介,并负责注册接收消息,参与事务,资源获取和释放,异常转换等。这使您作为应用程序开发人员可以编写与接收消息(并可能响应消息)相关的(可能很复杂)业务逻辑,并将样板 JMS 基础结构问题委托给框架。

Spring 附带了两个标准的 JMS 消息侦听器容器,每个容器都有其专门的功能集。

SimpleMessageListenerContainer

此消息侦听器容器是两种标准样式中的简单容器。它在启动时创建固定数量的 JMS 会话和使用者,使用标准 JMS MessageConsumer.setMessageListener()方法注册侦听器,并将其留给 JMS 提供者执行侦听器回调。此变体不允许动态适应运行时需求或参与外部 Management 的事务。在兼容性方面,它非常接近独立 JMS 规范的精神-但通常与 Java EE 的 JMS 限制不兼容。

Note

尽管SimpleMessageListenerContainer不允许参与外部 Management 的事务,但它支持本机 JMS 事务:只需将'sessionTransacted'标志切换为'true',或在名称空间中将'acknowledge'属性设置为'transacted':引发异常来自您的 Listener 的消息将导致回滚,并重新发送消息。或者,考虑使用“ CLIENT_ACKNOWLEDGE”模式,该模式在出现异常的情况下也可以重新传送,但不使用事务处理的会话,因此在事务协议中不包括任何其他会话操作(例如发送响应消息)。

默认的“ AUTO_ACKNOWLEDGE”模式无法提供适当的可靠性保证. 当侦听器执行失败时,消息可能会丢失(因为提供者将在侦听器调用后自动确认每条消息,没有 exceptions 会传播到提供者)侦听器容器关闭(可以通过'acceptMessagesWhileStopping'标志进行配置)。如果需要可靠性,请确保使用事务处理会话。用于可靠的队列处理和持久的主题订阅。

DefaultMessageListenerContainer

此消息侦听器容器是大多数情况下使用的容器。与SimpleMessageListenerContainer相比,此容器变体允许动态适应运行时需求,并且能够参与外部 Management 的事务。配置为JtaTransactionManager时,每个接收到的消息都会注册到 XA 事务中;因此处理可以利用 XA 事务语义。这个侦听器容器在 JMS 提供程序的低要求,高级功能(例如参与外部 Management 的事务)以及与 Java EE 环境的兼容性之间取得了良好的平衡。

容器的缓存级别可以自定义。请注意,当未启用缓存时,将为每个消息接收创建一个新的连接和一个新的会话。将其与具有高负载的非持久订阅结合使用可能会导致消息丢失。在这种情况下,请确保使用适当的缓存级别。

当代理关闭时,此容器还具有可恢复的功能。默认情况下,简单的BackOff实现每 5 秒重试一次。可以为更细粒度的恢复选项指定自定义BackOff实现,请参见ExponentialBackOff为例。

Note

与其同级SimpleMessageListenerContainer一样,DefaultMessageListenerContainer支持本机 JMS 事务,并且还允许自定义确认模式。如果可能的话,强烈建议在外部 Management 的事务上进行此操作:也就是说,如果 JVM 死亡,您可以偶尔接收重复消息。业务逻辑中的自定义重复消息检测步骤可能涵盖了这种情况,例如以业务实体存在检查或协议表检查的形式。任何这样的安排都将比其他安排更加有效:用 XA 事务包装整个处理过程(通过将DefaultMessageListenerContainerJtaTransactionManager配置为),涵盖 JMS 消息的接收以及消息侦听器中业务逻辑的执行(包括数据库操作等)。

默认的'AUTO_ACKNOWLEDGE'模式不提供适当的可靠性保证. 侦听器执行失败时消息可能会丢失(因为提供者会在侦听器调用之前自动确认每条消息)或侦听器容器关闭(可能已配置)通过“ acceptMessagesWhileStopping”标志)。如果需要可靠性,请确保使用事务处理会话。用于可靠的队列处理和持久的主题订阅。

30.2.5TransactionManagement

Spring 提供了一个JmsTransactionManager,用于 Management 单个 JMS ConnectionFactory的事务。这允许 JMS 应用程序利用第十七章,TransactionManagement中描述的 Spring 的托管事务功能。 JmsTransactionManager执行本地资源事务,将来自指定ConnectionFactory的 JMS 连接/会话对绑定到线程。 JmsTemplate自动检测此类 Transaction 资源并相应地对其进行操作。

在 Java EE 环境中,ConnectionFactory将合并“连接”和“会话”,因此这些资源可有效地跨事务使用。在独立环境中,使用 Spring 的SingleConnectionFactory将导致共享 JMS Connection,每个事务都有其自己的独立Session。或者,考虑使用提供程序专用的池适配器,例如 ActiveMQ 的PooledConnectionFactory类。

JmsTemplate也可以与JtaTransactionManager和具有 XA 功能的 JMS ConnectionFactory一起使用,以执行分布式事务。注意,这需要使用 JTA 事务 Management 器以及经过 XA 配置的 ConnectionFactory! (检查您的 Java EE 服务器的/ JMS 提供程序的文档.)

使用 JMS API 从Connection创建Session时,在托管和非托管事务环境中重用代码可能会造成混淆。这是因为 JMS API 只有一种工厂方法可以创建Session,并且它需要事务和确认模式的值。在托管环境中,设置这些值是环境的事务基础结构的责任,因此,供应商对 JMS Connection 的包装将忽略这些值。在非托管环境中使用JmsTemplate时,可以通过使用属性sessionTransactedsessionAcknowledgeMode来指定这些值。当将PlatformTransactionManagerJmsTemplate一起使用时,将始终为模板提供事务 JMS Session

30.3 发送信息

JmsTemplate包含许多发送消息的便捷方法。有一些发送方法使用javax.jms.Destination对象指定目标,而那些发送方法使用在 JNDI 查找中使用的字符串指定目标。不带目的地参数的 send 方法使用默认目的地。

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Session;

import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.core.JmsTemplate;

public class JmsQueueSender {

    private JmsTemplate jmsTemplate;
    private Queue queue;

    public void setConnectionFactory(ConnectionFactory cf) {
        this.jmsTemplate = new JmsTemplate(cf);
    }

    public void setQueue(Queue queue) {
        this.queue = queue;
    }

    public void simpleSend() {
        this.jmsTemplate.send(this.queue, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage("hello queue world");
            }
        });
    }
}

本示例使用MessageCreator回调从提供的Session对象创建文本消息。通过将引用传递给ConnectionFactory来构造JmsTemplate。作为替代方案,提供了零参数构造函数和connectionFactory,它们可用于以 JavaBean 样式(使用 BeanFactory 或纯 Java 代码)构造实例。或者,考虑从 Spring 的JmsGatewaySupport便捷 Base Class 派生,该 Base Class 为 JMS 配置提供了预先构建的 bean 属性。

send(String destinationName, MessageCreator creator)方法使您可以使用目标的字符串名称发送消息。如果这些名称已在 JNDI 中注册,则应将模板的destinationResolver属性设置为JndiDestinationResolver的实例。

如果您创建了JmsTemplate并指定了默认目的地,则send(MessageCreator c)会向该目的地发送一条消息。

30.3.1 使用消息转换器

为了方便域模型对象的发送,JmsTemplate具有各种发送方法,这些方法将 Java 对象作为消息数据内容的参数。 JmsTemplate中的重载方法convertAndSend()receiveAndConvert()将转换过程委托给MessageConverter接口的实例。该接口定义了一个简单的协定,可以在 Java 对象和 JMS 消息之间进行转换。默认实现SimpleMessageConverter支持StringTextMessagebyte[]BytesMesssage以及java.util.MapMapMessage之间的转换。通过使用转换器,您和您的应用程序代码可以专注于通过 JMS 发送或接收的业务对象,而不必担心如何将其表示为 JMS 消息。

沙箱当前包含一个MapMessageConverter,该MapMessageConverter使用反射在 JavaBean 和MapMessage之间进行转换。您可能会自己实现的其他流行实现选择是使用现有 XML 编组程序包(例如 JAXB,Castor,XMLBeans 或 XStream)的 Converter 创建代表对象的TextMessage

为了适应消息属性,Headers 和正文的设置,这些设置通常不能封装在转换器类中,因此MessagePostProcessor接口可让您在消息转换之后但发送之前访问消息。下面的示例演示了将java.util.Map转换为消息后如何修改消息头和属性。

public void sendWithConversion() {
    Map map = new HashMap();
    map.put("Name", "Mark");
    map.put("Age", new Integer(47));
    jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
        public Message postProcessMessage(Message message) throws JMSException {
            message.setIntProperty("AccountID", 1234);
            message.setJMSCorrelationID("123-00001");
            return message;
        }
    });
}

这将产生以下形式的消息:

MapMessage={
	Header={
		... standard headers ...
		CorrelationID={123-00001}
	}
	Properties={
		AccountID={Integer:1234}
	}
	Fields={
		Name={String:Mark}
		Age={Integer:47}
	}
}

30.3.2 SessionCallback 和 ProducerCallback

尽管发送操作涵盖了许多常见的使用场景,但是在某些情况下,您希望对 JMS SessionMessageProducer执行多个操作。 SessionCallbackProducerCallback分别暴露 JMS SessionSession/MessageProducer对。 JmsTemplate上的execute()方法执行这些回调方法。

30.4 接收消息

30.4.1 同步接收

虽然 JMS 通常与异步处理相关联,但可以同步使用消息。重载的receive(..)方法提供了此功能。在同步接收期间,调用线程将阻塞,直到消息可用为止。这可能是危险的操作,因为调用线程可能会无限期地被阻塞。属性receiveTimeout指定接收者在放弃 await 消息之前应该 await 多长时间。

30.4.2 异步接收:消息驱动的 POJO

Note

Spring 还通过使用@JmsListenerComments 支持带 Comments 的侦听器端点,并提供了开放的基础结构以编程方式注册端点。到目前为止,这是设置异步接收器的最便捷方法,有关更多详细信息,请参见第 30.6.1 节“启用侦听器端点 Comments”

消息驱动 POJO(MDP)以类似于 EJB 世界中的消息驱动 Bean(MDB)的方式充当 JMS 消息的接收者。对 MDP 的一个限制(但也可以在下面参见MessageListenerAdapter类的讨论)是它必须实现javax.jms.MessageListener接口。另请注意,在 POJO 将在多个线程上接收消息的情况下,确保您的实现是线程安全的很重要。

以下是 MDP 的简单实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                System.out.println(((TextMessage) message).getText());
            }
            catch (JMSException ex) {
                throw new RuntimeException(ex);
            }
        }
        else {
            throw new IllegalArgumentException("Message must be of type TextMessage");
        }
    }
}

实现MessageListener后,就该创建一个消息侦听器容器了。

在下面找到一个示例,该示例说明如何定义和配置 Spring 附带的消息侦听器容器(在本例中为DefaultMessageListenerContainer)。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

请参阅各种消息侦听器容器的 Spring javadocs,以获取每个实现支持的功能的完整说明。

30.4.3 SessionAwareMessageListener 接口

SessionAwareMessageListener接口是特定于 Spring 的接口,它提供与 JMS MessageListener接口类似的协定,但还提供了消息处理方法,可以访问从中接收Message的 JMS Session

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

    void onMessage(Message message, Session session) throws JMSException;
}

如果希望 MDP 能够响应任何接收到的消息(使用onMessage(Message, Session)方法中提供的Session),则可以选择让 MDP 实现此接口(优先于标准 JMS MessageListener接口)。 Spring 附带的所有消息侦听器容器实现都支持实现MessageListenerSessionAwareMessageListener接口的 MDP。实现SessionAwareMessageListener的类带有警告,然后通过接口将它们绑定到 Spring。是否使用它的选择完全由您作为应用程序开发人员或架构师来决定。

请注意,SessionAwareMessageListener接口的'onMessage(..)'方法抛出JMSException。与标准 JMS MessageListener接口相反,使用SessionAwareMessageListener接口时,Client 端代码负责处理引发的所有异常。

30.4.4 MessageListenerAdapter

MessageListenerAdapter类是 Spring 异步消息支持中的最后一个组件:简而言之,它允许您将几乎* any *类公开为 MDP(当然存在一些约束)。

考虑以下接口定义。请注意,尽管该接口既没有扩展MessageListener也没有扩展SessionAwareMessageListener接口,但仍可以通过使用MessageListenerAdapter类将其用作 MDP。还请注意,如何根据各种消息处理方法可以接收和处理的各种Message类型的* contents 来强类型化。

public interface MessageDelegate {

    void handleMessage(String message);

    void handleMessage(Map message);

    void handleMessage(byte[] message);

    void handleMessage(Serializable message);
}
public class DefaultMessageDelegate implements MessageDelegate {
    // implementation elided for clarity...
}

特别要注意,上面的MessageDelegate接口的实现(上面的DefaultMessageDelegate类)是如何完全不具有 JMS 依赖性的。这确实是一个 POJO,我们将通过以下配置将其制作为 MDP。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultMessageDelegate"/>
    </constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
</bean>

下面是另一个只能处理 JMS TextMessage消息接收的 MDP 的示例。请注意,实际上如何将消息处理方法称为'receive'(MessageListenerAdapter中的消息处理方法的名称默认为'handleMessage'),但是它是可配置的(如下所示)。还请注意'receive(..)'方法是如何强类型 Importing 的,以便仅接收和响应 JMS TextMessage消息。

public interface TextMessageDelegate {

    void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
    // implementation elided for clarity...
}

话务员MessageListenerAdapter的配置如下所示:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="jmsexample.DefaultTextMessageDelegate"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="receive"/>
    <!-- we don't want automatic message context extraction -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>

请注意,如果上面的'messageListener'收到的类型不是TextMessage的 JMS Message,则将抛出IllegalStateException(随后将其吞咽)。 MessageListenerAdapter类的另一个功能是,如果处理程序方法返回非无效值,则自动发送回响应Message的功能。考虑接口和类:

public interface ResponsiveTextMessageDelegate {

    // notice the return type...
    String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
    // implementation elided for clarity...
}

如果上面的DefaultResponsiveTextMessageDelegateMessageListenerAdapter结合使用,那么从'receive(..)'方法的执行返回的任何非 null 值都将(在默认配置中)转换为TextMessage。然后,将结果TextMessage发送到原始Message的 JMS Reply-To 属性中定义的Destination(如果存在)或在MessageListenerAdapter上设置的默认Destination(如果已配置);如果未找到Destination,则将抛出InvalidDestinationException(并且请注意,此异常将不会被吞咽,并且会在调用堆栈中传播)。

30.4.5 处理 Transaction 中的消息

在事务中调用消息侦听器仅需要重新配置侦听器容器。

只需通过侦听器容器定义上的sessionTransacted标志即可激活本地资源事务。然后,每个消息侦听器调用将在活动的 JMS 事务中运行,并且在侦听器执行失败的情况下回退消息接收。 (通过SessionAwareMessageListener)发送响应消息将是同一本地事务的一部分,但是任何其他资源操作(例如数据库访问)都将独立运行。这通常需要在侦听器实现中进行重复消息检测,以解决数据库处理已提交但消息处理未能提交的情况。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="sessionTransacted" value="true"/>
</bean>

要参与外部 Management 的事务,您将需要配置事务 Management 器并使用支持外部 Management 的事务的侦听器容器:通常为DefaultMessageListenerContainer

要为 XA 事务参与配置消息侦听器容器,您将需要配置JtaTransactionManager(默认情况下,它委派给 Java EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory 必须具有 XA 功能,并已向您的 JTA 事务协调器正确注册! (检查 Java EE 服务器的 JNDI 资源配置.)这样既可以接收消息,也可以接收消息。数据库访问成为同一事务的一部分(具有统一的提交语义,但以 XA 事务日志开销为代价)。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,您只需将其添加到我们之前的容器配置中即可。容器将负责其余的工作。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="destination"/>
    <property name="messageListener" ref="messageListener"/>
    <property name="transactionManager" ref="transactionManager"/>
</bean>

30.5 对 JCA 消息端点的支持

从 2.5 版开始,Spring 还提供了对基于 JCA 的MessageListener容器的支持。 JmsMessageEndpointManager将尝试根据提供者的ResourceAdapter类名自动确定ActivationSpec类名。因此,通常可以只提供 Spring 的通用JmsActivationSpecConfig,如以下示例所示。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpecConfig">
        <bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
            <property name="destinationName" value="myQueue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

或者,您可以使用给定的ActivationSpec对象设置JmsMessageEndpointManagerActivationSpec对象也可以来自 JNDI 查找(使用<jee:jndi-lookup>)。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
    <property name="resourceAdapter" ref="resourceAdapter"/>
    <property name="activationSpec">
        <bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
            <property name="destination" value="myQueue"/>
            <property name="destinationType" value="javax.jms.Queue"/>
        </bean>
    </property>
    <property name="messageListener" ref="myMessageListener"/>
</bean>

使用 Spring 的ResourceAdapterFactoryBean,可以在本地配置目标ResourceAdapter,如以下示例所示。

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
    <property name="resourceAdapter">
        <bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
            <property name="serverUrl" value="tcp://localhost:61616"/>
        </bean>
    </property>
    <property name="workManager">
        <bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
    </property>
</bean>

指定的WorkManager也可以指向特定于环境的线程池-通常通过SimpleTaskWorkManager's“ asyncTaskExecutor”属性。如果您碰巧使用多个适配器,请考虑为所有ResourceAdapter实例定义一个共享线程池。

在某些环境(例如 WebLogic 9 或更高版本)中,整个ResourceAdapter对象可以从 JNDI 获得(使用<jee:jndi-lookup>)。然后,基于 Spring 的消息侦听器可以使用服务器的内置WorkManager与服务器托管的ResourceAdapter进行交互。

有关更多详细信息,请查阅JmsMessageEndpointManagerJmsActivationSpecConfigResourceAdapterFactoryBean的 javadoc。

Spring 还提供了一个通用的 JCA 消息端点 Management 器,该 Management 器与 JMS 无关:org.springframework.jca.endpoint.GenericMessageEndpointManager。该组件允许使用任何消息侦听器类型(例如 CCI MessageListener)和任何提供程序特定的 ActivationSpec 对象。请查阅 JCA 提供程序的文档,以了解连接器的实际功能,并查阅GenericMessageEndpointManager的 javadoc 以获取特定于 Spring 的配置详细信息。

Note

基于 JCA 的消息端点 Management 非常类似于 EJB 2.1 消息驱动 Bean。它使用相同的基础资源提供者 Contract。与 EJB 2.1 MDB 一样,您的 JCA 提供程序支持的任何消息侦听器接口也可以在 Spring 上下文中使用。尽管如此,Spring 仍然为 JMS 提供了显式的“便利”支持,这仅仅是因为 JMS 是与 JCA 端点 Management 协定一起使用的最常见的端点 API。

30.6 注解驱动的侦听器端点

异步接收消息的最简单方法是使用带 Comments 的侦听器端点基础结构。简而言之,它允许您将托管 bean 的方法公开为 JMS 侦听器端点。

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(String data) { ... }
}

上面示例的想法是,只要在javax.jms.Destination“ myDestination”上有消息可用,就相应地调用processOrder方法(在这种情况下,JMS 消息的内容类似于MessageListenerAdapter提供的内容)。

带 Comments 的端点基础结构使用JmsListenerContainerFactory在幕后为每种带 Comments 的方法创建一个消息侦听器容器。此类容器未针对应用程序上下文进行注册,但可以使用JmsListenerEndpointRegistry bean 轻松定位以进行 Management。

Tip

@JmsListener是 Java 8 上的可重复Comments,因此可以通过向其添加其他@JmsListener声明来将多个 JMS 目标关联到同一方法。在 Java 6 和 7 上,可以使用@JmsListenersComments。

30.6.1 启用侦听器端点 Comments

要启用对@JmsListenerComments 的支持,请将@EnableJms添加到您的@Configuration类中。

@Configuration
@EnableJms
public class AppConfig {

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setSessionTransacted(true);
        factory.setConcurrency("3-10");
        return factory;
    }
}

默认情况下,基础结构将查找名为jmsListenerContainerFactory的 bean 作为工厂用来创建消息侦听器容器的源。在这种情况下,忽略 JMS 基础结构设置,可以使用 3 个线程的核心轮询大小和 10 个线程的最大池大小来调用processOrder方法。

可以自定义侦听器容器工厂以使用每个 Comments,或者可以通过实现JmsListenerConfigurer接口配置显式默认值。仅当至少一个端点在没有特定容器工厂的情况下注册时才需要使用默认值。有关完整的详细信息和示例,请参见 javadoc。

如果您喜欢XML configuration,请使用<jms:annotation-driven>元素。

<jms:annotation-driven/>

<bean id="jmsListenerContainerFactory"
        class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationResolver" ref="destinationResolver"/>
    <property name="sessionTransacted" value="true"/>
    <property name="concurrency" value="3-10"/>
</bean>

30.6.2 程序化端点注册

JmsListenerEndpoint提供 JMS 端点的模型,并负责为该模型配置容器。除了JmsListenerComments 检测到的端点外,基础结构还允许您以编程方式配置端点。

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("myJmsEndpoint");
        endpoint.setDestination("anotherQueue");
        endpoint.setMessageListener(message -> {
            // processing
        });
        registrar.registerEndpoint(endpoint);
    }
}

在上面的示例中,我们使用了SimpleJmsListenerEndpoint来提供实际的MessageListener进行调用,但是您也可以构建自己的描述自定义调用机制的端点变量。

应该注意的是,您也可以完全跳过@JmsListener的使用,而只通过JmsListenerConfigurer以编程方式注册端点。

30.6.3 带 Comments 的终结点方法签名

到目前为止,我们已经在端点中注入了一个简单的String,但实际上它可以具有非常灵活的方法签名。让我们重写它,以使用自定义 Headers 注入Order

@Component
public class MyService {

    @JmsListener(destination = "myDestination")
    public void processOrder(Order order, @Header("order_type") String orderType) {
        ...
    }
}

这些是您可以在 JMS 侦听器端点中注入的主要元素:

  • 原始的javax.jms.Message或其任何子类(当然要与传入的消息类型匹配)。

  • javax.jms.Session用于可选地访问本地 JMS API,例如发送自定义回复。

  • org.springframework.messaging.Message代表传入的 JMS 消息。请注意,此消息同时包含自定义 Headers 和标准 Headers(由JmsHeaders定义)。

  • @Header-带 Comments 的方法参数,用于提取特定的 Headers 值,包括标准的 JMSHeaders。

  • @Headers-带 Comments 的参数,也必须可分配给java.util.Map以访问所有 Headers。

  • 不是受支持类型之一(即MessageSession)的未 Comments 元素被视为有效负载。您可以通过用@PayloadComments 参数来使其明确。您还可以通过添加额外的@Valid来启用验证。

注入 Spring 的Message抽象的能力特别有用,它可以受益于存储在特定于传输的消息中的所有信息,而无需依赖于特定于传输的 API。

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

DefaultMessageHandlerMethodFactory提供了方法参数的处理,可以进一步对其进行自定义以支持其他方法参数。转换和验证支持也可以在那里定制。

例如,如果我们想在处理Order之前确保其有效,则可以使用@ValidComments 有效负载并按如下所示配置必要的验证器:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
    }

    @Bean
    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setValidator(myValidator());
        return factory;
    }
}

30.6.4 响应 Management

MessageListenerAdapter中的现有支持已经允许您的方法具有非void返回类型。在这种情况下,调用的结果将封装在javax.jms.Message中,该javax.jms.Message可以在原始消息的JMSReplyToHeaders 中指定的目标中发送,也可以在侦听器上配置的默认目标中发送。现在可以使用消息传递抽象的@SendToComments 设置默认目的地。

假设我们的processOrder方法现在应该返回OrderStatus,则可以按照以下方式编写它以自动发送响应:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
    // order processing
    return status;
}

Tip

如果您有几种带有@JmsListenerComments 的方法,则还可以将@SendToComments 放置在类级别以共享默认的答复目标。

如果需要以与传输无关的方式设置其他 Headers,则可以返回Message,例如:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
    // order processing
    return MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
}

如果需要在运行时计算响应目标,则可以将响应封装在JmsResponse实例中,该实例还提供要在运行时使用的目标。前面的示例可以重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
    // order processing
    Message<OrderStatus> response = MessageBuilder
            .withPayload(status)
            .setHeader("code", 1234)
            .build();
    return JmsResponse.forQueue(response, "status");
}

30.7 JMS 名称空间支持

Spring 提供了一个 XML 名称空间来简化 JMS 配置。要使用 JMS 命名空间元素,您将需要引用 JMS 模式:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

    <!-- bean definitions here -->

</beans>

命名空间由三个顶级元素组成:<annotation-driven/><listener-container/><jca-listener-container/><annotation-driven启用Comments 驱动的侦听器端点的使用。 <listener-container/><jca-listener-container/>定义共享的侦听器容器配置,并且可以包含<listener/>子元素。这是两个侦听器的基本配置示例。

<jms:listener-container>

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

上面的示例等效于创建两个不同的侦听器容器 bean 定义和两个不同的MessageListenerAdapter bean 定义,如第 30.4.4 节“ MessageListenerAdapter”所示。除了上面显示的属性外,listener元素还可以包含几个可选属性。下表描述了所有可用的属性:

表 30.1. JMS 元素的属性

AttributeDescription
id托管侦听器容器的 Bean 名称。如果未指定,将自动生成一个 Bean 名称。
目的地*(必填)*该侦听器的目标名称,通过DestinationResolver策略解析。
参考*(必填)*处理程序对象的 bean 名称。
method要调用的处理程序方法的名称。如果ref指向MessageListener或 Spring SessionAwareMessageListener,则可以省略此属性。
response-destination向其发送响应消息的默认响应目标的名称。如果请求消息中不包含“ JMSReplyTo”字段,则将应用此方法。此目标的类型将由侦听器容器的“响应目标类型”属性确定。注意:这仅适用于具有返回值的侦听器方法,为此,每个结果对象都将转换为响应消息。
subscription持久订阅的名称(如果有)。
selector此侦听器的可选消息 selectors。
concurrency要启动此侦听器的并发会话/使用者的数量。可以是表示最大数量的简单数字(例如“ 5”),也可以是表示上下限的范围(例如“ 3-5”)。请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认值为容器提供的值

<listener-container/>元素还接受几个可选属性。这允许自定义各种策略(例如taskExecutordestinationResolver)以及基本的 JMS 设置和资源引用。使用这些属性,可以定义高度自定义的侦听器容器,同时仍然受益于命名空间的便利性。

通过指定要通过factory-id属性公开的 bean 的 ID,可以将此类设置自动公开为JmsListenerContainerFactory

<jms:listener-container connection-factory="myConnectionFactory"
        task-executor="myTaskExecutor"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

    <jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

下表描述了所有可用属性。有关各个属性的更多详细信息,请查阅AbstractMessageListenerContainer及其具体子类的类级 javadocs。 javadocs 还提供了有关事务选择和消息重新交付场景的讨论。

表 30.2. JMS 元素的属性

AttributeDescription
container-type此侦听器容器的类型。可用选项为:defaultsimpledefault102simple102(默认值为'default')。
container-class自定义侦听器容器实现类,作为完全限定的类名。根据“ container-type”属性,默认值为 Spring 的标准DefaultMessageListenerContainerSimpleMessageListenerContainer
factory-id公开此元素定义为具有指定 ID 的JmsListenerContainerFactory的设置,以便它们可以与其他端点重复使用。
connection-factory对 JMS ConnectionFactory bean 的引用(默认 bean 名称为'connectionFactory')。
task-executor对 JMS 侦听器调用程序的 Spring TaskExecutor的引用。
destination-resolver对解决 JMS DestinationsDestinationResolver策略的引用。
message-converter对将 JMS 消息转换为侦听器方法参数的MessageConverter策略的引用。默认值为SimpleMessageConverter
error-handlerErrorHandler策略的引用,该策略用于处理在执行MessageListener期间可能发生的任何未捕获的异常。
destination-type此侦听器的 JMS 目标类型:queuetopicdurableTopicsharedTopicsharedDurableTopic。这可能启用了容器的pubSubDomainsubscriptionDurablesubscriptionShared属性。默认值为queue(即禁用这 3 个属性)。
response-destination-type响应的 JMS 目标类型:“队列”,“主题”。默认值为“目标类型”(destination-type)属性的值。
client-id此侦听器容器的 JMSClient 端 ID。使用持久订阅时需要指定。
cacheJMS 资源的缓存级别:noneconnectionsessionconsumerauto。默认情况下(auto),除非指定了外部事务 Management 器,否则缓存级别将有效地为“Consumer”-在这种情况下,有效的默认值为none(假定 Java EE 风格的事务 Management,其中给定的 ConnectionFactory 是 XA-知道池)。
acknowledge本机 JMS 确认模式:autoclientdups-oktransacted。值transacted激活本地处理的Session。或者,指定下面描述的transaction-manager属性。默认值为auto
transaction-manager对外部PlatformTransactionManager的引用(通常是基于 XA 的事务协调器,例如 Spring 的JtaTransactionManager)。如果未指定,将使用本机确认(请参阅“确认”属性)。
concurrency每个侦听器启动的并发会话/Consumer 数。可以是表示最大数量的简单数字(例如“ 5”),也可以是表示上下限的范围(例如“ 3-5”)。请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认为 1;如果主题侦听器或队列 Sequences 很重要,则将并发限制为 1;考虑将其提高到一般队列。
prefetch加载到单个会话中的最大消息数。请注意,提高此数字可能会导致并发 Consumer 挨饿!
receive-timeout用于接收呼叫的超时(以毫秒为单位)。默认值为1000 ms(1 秒); -1表示完全没有超时。
back-off指定用于计算两次恢复尝试间隔的BackOff实例。如果BackOffExecution实现返回BackOffExecution#STOP,则侦听器容器将不会进一步尝试恢复。设置此属性时,将忽略recovery-interval值。默认值为FixedBackOff,间隔为 5000 毫秒,即 5 秒。
recovery-interval指定两次恢复尝试之间的时间间隔(以毫秒为单位)。使用指定间隔创建FixedBackOff的便捷方法。有关更多恢复选项,请考虑改为指定 BackOff 实例。缺省值为 5000 毫秒,即 5 秒。
phase此容器应在其中启动和停止的生命周期阶段。值越低,此容器将越早启动,而其停止就越晚。默认值为Integer.MAX_VALUE,表示容器将尽可能晚地启动并尽快停止。

使用“ jms”模式支持配置基于 JCA 的侦听器容器非常相似。

<jms:jca-listener-container resource-adapter="myResourceAdapter"
        destination-resolver="myDestinationResolver"
        transaction-manager="myTransactionManager"
        concurrency="10">

    <jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

下表描述了 JCA 变体的可用配置选项:

表 30.3. JMS 元素的属性

AttributeDescription
factory-id公开此元素定义为具有指定 ID 的JmsListenerContainerFactory的设置,以便它们可以与其他端点重复使用。
resource-adapter对 JCA ResourceAdapter bean 的引用(默认 bean 名称为'resourceAdapter')。
activation-spec-factoryJmsActivationSpecFactory的引用。默认设置是自动检测 JMS 提供程序及其ActivationSpec类(请参见DefaultJmsActivationSpecFactory)
destination-resolver对解决 JMS DestinationsDestinationResolver策略的引用。
message-converter对将 JMS 消息转换为侦听器方法参数的MessageConverter策略的引用。默认值为SimpleMessageConverter
destination-type此侦听器的 JMS 目标类型:queuetopicdurableTopicsharedTopicsharedDurableTopic。这可能启用了容器的pubSubDomainsubscriptionDurablesubscriptionShared属性。默认值为queue(即禁用这 3 个属性)。
response-destination-type响应的 JMS 目标类型:“队列”,“主题”。默认值为“目标类型”(destination-type)属性的值。
client-id此侦听器容器的 JMSClient 端 ID。使用持久订阅时需要指定。
acknowledge本机 JMS 确认模式:autoclientdups-oktransacted。值transacted激活本地处理的Session。或者,指定下面描述的transaction-manager属性。默认值为auto
transaction-manager对 Spring JtaTransactionManagerjavax.transaction.TransactionManager的引用,用于为每个传入消息启动 XA 事务。如果未指定,则将使用本机确认(请参见“确认”属性)。
concurrency每个侦听器启动的并发会话/Consumer 数。可以是表示最大数量的简单数字(例如“ 5”),也可以是表示上下限的范围(例如“ 3-5”)。请注意,指定的最小值仅是一个提示,通常在运行时使用 JCA 侦听器容器时将被忽略。默认值为 1.
prefetch加载到单个会话中的最大消息数。请注意,提高此数字可能会导致并发 Consumer 挨饿!