32. Messaging

Spring 框架为与消息传递系统集成提供了广泛的支持:从简化使用JmsTemplate的 JMS API 的使用到完整的基础结构以异步接收消息。 Spring AMQP 为“高级消息队列协议”提供了类似的功能集,Spring Boot 还为RabbitTemplate和 RabbitMQ 提供了自动配置选项。 Spring WebSocket 本身还支持 STOMP 消息传递,Spring Boot 通过启动程序和少量的自动配置对此提供了支持。 Spring Boot 还支持 Apache Kafka。

32.1 JMS

javax.jms.ConnectionFactory接口提供了创建javax.jms.Connection与 JMS 代理进行交互的标准方法。尽管 Spring 需要ConnectionFactory才能与 JMS 一起使用,但是您通常不需要自己直接使用它,而可以依赖于更高级别的消息传递抽象(有关详细信息,请参见 Spring Framework 参考文档的relevant section)。 Spring Boot 还自动配置了必要的基础结构,以发送和接收消息。

32.1.1 ActiveMQ 支持

当 Spring Boot 检测到 ActiveMQ 在 Classpath 上可用时,它也可以配置ConnectionFactory。如果存在代理,则将自动启动并配置嵌入式代理(只要未通过配置指定代理 URL)。

Note

如果您使用的是spring-boot-starter-activemq,则将提供连接或嵌入 ActiveMQ 实例的必要依赖关系,以及与 JMS 集成的 Spring 基础结构。

ActiveMQ 配置由spring.activemq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret

您还可以通过将依赖项添加到org.apache.activemq:activemq-pool并相应地配置PooledConnectionFactory来池化 JMS 资源:

spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50

Tip

有关更多受支持的选项,请参见ActiveMQProperties。您还可以注册任意数量的实现ActiveMQConnectionFactoryCustomizer的 bean,以进行更高级的自定义。

默认情况下,ActiveMQ 会创建目的地(如果目的地尚不存在),因此将根据其提供的名称来解析目的地。

32.1.2 Artemis 支持

当 Spring Boot 检测到 Artemis 在 Classpath 中可用时,它可以自动配置ConnectionFactory。如果存在代理,则将自动启动和配置嵌入式代理(除非已显式设置 mode 属性)。支持的模式为:embedded(用于明确要求嵌入式代理,如果 Classpath 中不存在代理,则应该导致错误),以及native使用netty传输协议连接到代理。配置后者后,Spring Boot 将使用默认设置配置ConnectionFactory连接到在本地计算机上运行的代理。

Note

如果您使用的是spring-boot-starter-artemis,则将提供连接到现有 Artemis 实例所需的依赖项,以及与 JMS 集成的 Spring 基础结构。将org.apache.activemq:artemis-jms-server添加到您的应用程序可以使您使用嵌入式模式。

Artemis 配置由spring.artemis.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret

嵌入代理时,可以选择是否要启用持久性,以及应该提供的目标列表。可以将它们指定为逗号分隔的列表,以使用默认选项创建它们。或者,您可以分别为高级队列和主题配置定义类型为org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration的 bean。

有关更多受支持的选项,请参见ArtemisProperties

根本不涉及 JNDI 查找,并且使用 Artemis 配置中的'name'属性或配置中提供的名称来根据目的地名称解析目的地。

32.1.3 使用 JNDI ConnectionFactory

如果您在 Application Server 中运行应用程序,Spring Boot 将尝试使用 JNDI 查找 JMS ConnectionFactory。默认情况下,将检查位置java:/JmsXAjava:/XAConnectionFactory。如果需要指定其他位置,则可以使用spring.jms.jndi-name属性:

spring.jms.jndi-name=java:/MyConnectionFactory

32.1.4 发送消息

Spring 的JmsTemplate是自动配置的,您可以将其直接自动连接到自己的 bean 中:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    @Autowired
    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

}

Note

JmsMessagingTemplate可以类似的方式注入。如果定义了DestinationResolverMessageConverter bean,它们将自动关联到自动配置的JmsTemplate

32.1.5 接收消息

存在 JMS 基础结构时,可以使用@JmsListenerComments 任何 bean 以创建侦听器端点。如果未定义JmsListenerContainerFactory,则会自动配置一个默认值。如果定义了DestinationResolverMessageConverter bean,它们将自动关联到默认工厂。

默认情况下,默认工厂是事务型的。如果您在存在JtaTransactionManager的基础结构中运行,则默认情况下它将与监听器容器关联。如果没有,则将启用sessionTransacted标志。在后一种情况下,可以通过在侦听器方法(或其委托)上添加@Transactional来将本地数据存储事务与传入消息的处理相关联。这将确保本地事务完成后,传入消息得到确认。这还包括发送已在同一 JMS 会话上执行的响应消息。

以下组件在someQueue目标上创建侦听器端点:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

Tip

查看@EnableJms 的 Javadoc了解更多详细信息。

如果您需要创建更多的JmsListenerContainerFactory实例,或者想要覆盖默认实例,Spring Boot 提供了一个DefaultJmsListenerContainerFactoryConfigurer,您可以使用与自动配置的设置相同的设置来初始化DefaultJmsListenerContainerFactory

例如,以下代码公开了另一个使用特定MessageConverter的工厂:

@Configuration
static class JmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory());
        factory.setMessageConverter(myMessageConverter());
        return factory;
    }

}

然后,您可以在任何带有@JmsListenerComments 的方法中使用,如下所示:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue", containerFactory="myFactory")
    public void processMessage(String content) {
        // ...
    }

}

32.2 AMQP

高级消息队列协议(AMQP)是面向消息中间件的与平台无关的有线级别协议。 Spring AMQP 项目将 Spring 的核心概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了许多便利,包括spring-boot-starter-amqp'Starter'。

32.2.1 RabbitMQ 支持

RabbitMQ 是基于 AMQP 协议的轻型,可靠,可伸缩和便携式消息代理。 Spring 使用RabbitMQ通过 AMQP 协议进行通信。

RabbitMQ 配置由spring.rabbitmq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret

有关更多受支持的选项,请参见RabbitProperties

Tip

查看了解 AMQP,RabbitMQ 使用的协议了解更多详细信息。

32.2.2 发送信息

Spring 的AmqpTemplateAmqpAdmin是自动配置的,您可以将它们直接自动连接到自己的 bean 中:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;
    private final AmqpTemplate amqpTemplate;

    @Autowired
    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

}

Note

RabbitMessagingTemplate可以类似的方式注入。如果定义了MessageConverter bean,它将自动关联到自动配置的AmqpTemplate

如有必要,任何定义为 Bean 的org.springframework.amqp.core.Queue将自动用于在 RabbitMQ 实例上声明相应的队列。

您可以在AmqpTemplate上启用重试,以重试操作,例如在代理连接丢失的情况下。默认情况下,重试是禁用的。

32.2.3 接收消息

存在 Rabbit 基础结构时,可以使用@RabbitListenerComments 任何 bean 以创建侦听器端点。如果未定义RabbitListenerContainerFactory,则会自动配置一个默认值。如果定义了MessageConverterMessageRecoverer bean,它们将自动关联到默认工厂。

以下组件在someQueue队列上创建一个侦听器端点:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}

Tip

查看@EnableRabbit 的 Javadoc了解更多详细信息。

如果您需要创建更多的RabbitListenerContainerFactory实例,或者想要覆盖默认实例,Spring Boot 提供了一个SimpleRabbitListenerContainerFactoryConfigurer,您可以使用与自动配置的设置相同的设置来初始化SimpleRabbitListenerContainerFactory

例如,以下代码公开了另一个使用特定MessageConverter的工厂:

@Configuration
static class RabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(myMessageConverter());
        return factory;
    }

}

然后,您可以在任何带有@RabbitListenerComments 的方法中使用,如下所示:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory="myFactory")
    public void processMessage(String content) {
        // ...
    }

}

您可以启用重试来处理侦听器引发异常的情况。默认情况下使用RejectAndDontRequeueRecoverer,但您可以定义自己的MessageRecoverer。重试用尽后,如果将代理配置为该消息,则该消息将被拒绝并被丢弃或路由到死信交换机。默认情况下,重试是禁用的。

Tip

如果未启用重试并且侦听器引发异常,则默认情况下将无限期重试传递。您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为false,将尝试进行零次重新传递;或者,抛出AmqpRejectAndDontRequeueException表示该消息应被拒绝。这是启用重试并达到最大传递尝试次数时使用的机制。

32.3 Apache Kafka 支持

通过提供spring-kafka项目的自动配置来支持Apache Kafka

Kafka 配置由spring.kafka.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

有关更多受支持的选项,请参见KafkaProperties

32.3.1 发送消息

Spring 的KafkaTemplate是自动配置的,您可以将它们直接自动连接到自己的 bean 中:

@Component
public class MyBean {

	private final KafkaTemplate kafkaTemplate;

	@Autowired
	public MyBean(KafkaTemplate kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

}

32.3.2 接收消息

存在 Apache Kafka 基础结构时,可以使用@KafkaListenerComments 任何 bean 以创建侦听器端点。如果未定义KafkaListenerContainerFactory,则会使用spring.kafka.listener.*中定义的键自动配置一个默认值。

以下组件在someTopic主题上创建侦听器终结点:

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

32.3.3 Kafka 的其他属性

自动配置支持的属性显示在附录 A,通用应用程序属性中。请注意,这些属性(连字符或 camelCase)在大多数情况下直接 Map 到 Apache Kafka 点缀属性,有关详细信息,请参阅 Apache Kafka 文档。

这些属性的前几个属性同时适用于生产者和 Consumer,但是如果您希望为每个生产者或 Consumer 使用不同的值,则可以在生产者或 Consumer 级别指定这些属性。 Apache Kafka 指定具有重要意义的属性:HIGH,MEDIUM 和 LOW。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 以及任何不具有默认值的属性。

KafkaProperties类仅提供 Kafka 支持的属性的子集。如果希望使用不直接支持的其他属性来配置生产者或使用者,请使用以下命令:

spring.kafka.properties.foo.bar=baz

这会将公共foo.bar Kafka 属性设置为baz

这些属性将由 Consumer 和生产者工厂 Bean 共享。如果您希望使用不同的属性来自定义这些组件,例如为每个组件使用不同的度量读取器,则可以覆盖 bean 定义,如下所示:

@Configuration
public static class CustomKafkaBeans {

    /**
     * Customized ProducerFactory bean.
     * @param properties the kafka properties.
     * @return the bean.
     */
    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
        Map<String, Object> producerProperties = properties.buildProducerProperties();
        producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
                MyProducerMetricsReporter.class);
        return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
    }

    /**
     * Customized ConsumerFactory bean.
     * @param properties the kafka properties.
     * @return the bean.
     */
    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
        Map<String, Object> consumerProperties = properties.buildConsumerProperties();
        consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
                MyConsumerMetricsReporter.class);
        return new DefaultKafkaConsumerFactory<Object, Object>(consumerProperties);
    }

}