On this page
33. Messaging
Spring 框架为与消息传递系统集成提供了广泛的支持,从简化使用JmsTemplate的 JMS API 的使用到完整的异步接收消息的基础结构。 Spring AMQP 为高级消息队列协议提供了类似的功能集。 Spring Boot 还为RabbitTemplate和 RabbitMQ 提供了自动配置选项。 Spring WebSocket 本身就包含对 STOMP 消息的支持,而 Spring Boot 通过启动程序和少量的自动配置对此提供了支持。 Spring Boot 还支持 Apache Kafka。
33.1 JMS
javax.jms.ConnectionFactory接口提供了创建javax.jms.Connection与 JMS 代理进行交互的标准方法。尽管 Spring 需要ConnectionFactory才能与 JMS 一起使用,但是您通常不需要自己直接使用它,而是可以依靠更高级别的消息抽象。 (有关详细信息,请参见 Spring Framework 参考文档的relevant section。)Spring Boot 还自动配置必要的基础结构来发送和接收消息。
33.1.1 ActiveMQ 支持
当ActiveMQ在 Classpath 上可用时,Spring Boot 也可以配置ConnectionFactory。如果存在代理,则将自动启动和配置嵌入式代理(前提是未通过配置指定代理 URL)。
Note
如果使用spring-boot-starter-activemq,则将提供连接或嵌入 ActiveMQ 实例所需的依赖关系,以及与 JMS 集成的 Spring 基础结构。
ActiveMQ 配置由spring.activemq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret
默认情况下,CachingConnectionFactory使用可以由spring.jms.*中的外部配置属性控制的明智设置来包装本机ConnectionFactory:
spring.jms.cache.session-cache-size=5
如果您想使用本机池,则可以通过向org.messaginghub:pooled-jms添加依赖项并相应地配置JmsPoolConnectionFactory来实现,如下例所示:
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
Tip
有关更多受支持的选项,请参见ActiveMQProperties。您还可以注册任意数量的实现ActiveMQConnectionFactoryCustomizer的 bean,以进行更高级的自定义。
默认情况下,ActiveMQ 将创建一个目的地(如果目的地尚不存在),以便根据其提供的名称来解析目的地。
33.1.2 Artemis 支持
当 Spring Boot 检测到Artemis在 Classpath 中可用时,它可以自动配置ConnectionFactory。如果存在代理,则将自动启动和配置嵌入式代理(除非已明确设置 mode 属性)。支持的模式为embedded(以明确要求使用嵌入式代理,并且如果代理在 Classpath 上不可用,则会发生错误)和native(使用netty传输协议连接到代理)。配置后者后,Spring Boot 将使用默认设置配置一个ConnectionFactory,该ConnectionFactory连接到在本地计算机上运行的代理。
Note
如果使用spring-boot-starter-artemis,则将提供连接到现有 Artemis 实例所需的依赖关系,以及与 JMS 集成的 Spring 基础结构。将org.apache.activemq:artemis-jms-server添加到您的应用程序可让您使用嵌入式模式。
Artemis 配置由spring.artemis.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret
嵌入代理时,可以选择是否要启用持久性并列出应使其可用的目的地。可以将它们指定为以逗号分隔的列表,以使用默认选项创建它们,也可以为高级队列和主题配置分别定义类型org.apache.activemq.artemis.jms.server.config.JMSQueueConfiguration或org.apache.activemq.artemis.jms.server.config.TopicConfiguration的 bean。
默认情况下,CachingConnectionFactory使用可以由spring.jms.*中的外部配置属性控制的明智设置来包装本机ConnectionFactory:
spring.jms.cache.session-cache-size=5
如果您想使用本机池,则可以通过向org.messaginghub:pooled-jms添加依赖项并相应地配置JmsPoolConnectionFactory来实现,如下例所示:
spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50
有关更多受支持的选项,请参见ArtemisProperties。
不涉及 JNDI 查找,并且使用 Artemis 配置中的name属性或通过配置提供的名称来根据目的地名称解析目的地。
33.1.3 使用 JNDI ConnectionFactory
如果您正在应用程序服务器中运行应用程序,则 Spring Boot 会尝试使用 JNDI 查找 JMS ConnectionFactory。默认情况下,选中java:/JmsXA和java:/XAConnectionFactory位置。如果需要指定替代位置,则可以使用spring.jms.jndi-name属性,如以下示例所示:
spring.jms.jndi-name=java:/MyConnectionFactory
33.1.4 发送消息
Spring 的JmsTemplate是自动配置的,您可以将其直接自动连接到自己的 bean 中,如以下示例所示:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final JmsTemplate jmsTemplate;
@Autowired
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// ...
}
Note
JmsMessagingTemplate可以类似的方式注入。如果定义了DestinationResolver或MessageConverter bean,则将其自动关联到自动配置的JmsTemplate。
33.1.5 接收消息
存在 JMS 基础结构时,可以使用@JmsListenerComments 任何 bean 以创建侦听器端点。如果未定义JmsListenerContainerFactory,则会自动配置一个默认值。如果定义了DestinationResolver或MessageConverter bean,它将自动关联到默认工厂。
默认情况下,默认工厂是事务性的。如果您在存在JtaTransactionManager的基础结构中运行,则默认情况下会将其与侦听器容器关联。如果不是,则启用sessionTransacted标志。在后一种情况下,可以通过在侦听器方法(或其委托)上添加@Transactional来将本地数据存储事务与传入消息的处理相关联。这样可以确保本地事务完成后,传入消息得到确认。这还包括发送已在同一 JMS 会话上执行的响应消息。
以下组件在someQueue目标上创建侦听器端点:
@Component
public class MyBean {
@JmsListener(destination = "someQueue")
public void processMessage(String content) {
// ...
}
}
Tip
有关更多详细信息,请参见@EnableJms 的 Javadoc。
如果您需要创建更多的JmsListenerContainerFactory实例,或者想覆盖默认实例,Spring Boot 提供了一个DefaultJmsListenerContainerFactoryConfigurer,您可以使用与自动配置的设置相同的设置来初始化DefaultJmsListenerContainerFactory。
例如,以下示例公开了另一个使用特定MessageConverter的工厂:
@Configuration
static class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory myFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory());
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后,您可以在任何带有@JmsListenerComments 的方法中使用工厂,如下所示:
@Component
public class MyBean {
@JmsListener(destination = "someQueue", containerFactory="myFactory")
public void processMessage(String content) {
// ...
}
}
33.2 AMQP
高级消息队列协议(AMQP)是面向消息中间件的与平台无关的有线级别协议。 Spring AMQP 项目将 Spring 的核心概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了许多便利,包括spring-boot-starter-amqp“ Starter”。
33.2.1 RabbitMQ 支持
RabbitMQ是基于 AMQP 协议的轻型,可靠,可伸缩和便携式消息代理。 Spring 使用RabbitMQ通过 AMQP 协议进行通信。
RabbitMQ 配置由spring.rabbitmq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
如果上下文中存在ConnectionNameStrategy bean,它将自动用于命名由自动配置的ConnectionFactory创建的连接。有关更多受支持的选项,请参见RabbitProperties。
Tip
有关更多详细信息,请参见了解 AMQP,RabbitMQ 使用的协议。
33.2.2 发送信息
Spring 的AmqpTemplate和AmqpAdmin是自动配置的,您可以将它们直接自动连接到自己的 bean 中,如以下示例所示:
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
@Autowired
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
}
Note
RabbitMessagingTemplate可以类似的方式注入。如果定义了MessageConverter bean,它将自动关联到自动配置的AmqpTemplate。
如有必要,任何定义为 bean 的org.springframework.amqp.core.Queue都会自动用于在 RabbitMQ 实例上声明相应的队列。
要重试操作,可以在AmqpTemplate上启用重试(例如,在代理连接丢失的情况下):
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
默认情况下,重试是禁用的。您还可以pass 语句RabbitRetryTemplateCustomizer bean 来以编程方式自定义RetryTemplate。
33.2.3 接收消息
存在 Rabbit 基础结构时,可以使用@RabbitListenerComments 任何 bean 以创建侦听器端点。如果未定义RabbitListenerContainerFactory,则会自动配置默认的SimpleRabbitListenerContainerFactory,并且可以使用spring.rabbitmq.listener.type属性切换到直接容器。如果定义了MessageConverter或MessageRecoverer bean,它将自动与默认工厂关联。
以下示例组件在someQueue队列上创建一个侦听器端点:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
Tip
有关更多详细信息,请参见@EnableRabbit 的 Javadoc。
如果您需要创建更多的RabbitListenerContainerFactory实例,或者想覆盖默认实例,Spring Boot 提供了SimpleRabbitListenerContainerFactoryConfigurer和DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们设置与自动配置使用的工厂相同的设置来初始化SimpleRabbitListenerContainerFactory和DirectRabbitListenerContainerFactory。
Tip
选择哪种容器都没有关系。这两个 bean 通过自动配置公开。
例如,以下配置类公开了另一个使用特定MessageConverter的工厂:
@Configuration
static class RabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(myMessageConverter());
return factory;
}
}
然后,您可以使用任何带有@RabbitListenerComments 的方法来使用工厂,如下所示:
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory="myFactory")
public void processMessage(String content) {
// ...
}
}
您可以启用重试来处理侦听器引发异常的情况。默认情况下,使用RejectAndDontRequeueRecoverer,但是您可以定义自己的MessageRecoverer。重试用尽后,如果将代理配置为这样做,则消息将被拒绝并被丢弃或路由到死信交换。默认情况下,重试是禁用的。您也可以pass 语句RabbitRetryTemplateCustomizer bean 来以编程方式自定义RetryTemplate。
Tip
默认情况下,如果禁用了重试,并且侦听器引发了异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为false,以便尝试进行零次重新传递或抛出AmqpRejectAndDontRequeueException来指示应拒绝该消息。后者是启用重试并达到最大传递尝试次数时使用的机制。
33.3 Apache Kafka 支持
通过提供spring-kafka项目的自动配置来支持Apache Kafka。
Kafka 配置由spring.kafka.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
Tip
要在启动时创建主题,请添加NewTopic类型的 Bean。如果该主题已经存在,则将忽略 Bean。
有关更多受支持的选项,请参见KafkaProperties。
33.3.1 发送消息
Spring 的KafkaTemplate是自动配置的,您可以直接在自己的 bean 中自动对其进行布线,如以下示例所示:
@Component
public class MyBean {
private final KafkaTemplate kafkaTemplate;
@Autowired
public MyBean(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// ...
}
Note
如果定义了属性spring.kafka.producer.transaction-id-prefix,则会自动配置KafkaTransactionManager。同样,如果定义了RecordMessageConverter bean,它将自动与自动配置的KafkaTemplate关联。
33.3.2 接收消息
存在 Apache Kafka 基础结构时,可以使用@KafkaListenerComments 任何 bean 以创建侦听器端点。如果未定义KafkaListenerContainerFactory,则会使用spring.kafka.listener.*中定义的键自动配置一个默认值。
以下组件在someTopic主题上创建侦听器终结点:
@Component
public class MyBean {
@KafkaListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
如果定义了KafkaTransactionManager bean,它将自动与容器工厂关联。同样,如果定义了RecordMessageConverter,ErrorHandler或AfterRollbackProcessor bean,它将自动与默认工厂关联。
Tip
自定义ChainedKafkaTransactionManager必须标记为@Primary,因为它通常引用自动配置的KafkaTransactionManager bean。
33.3.3 Kafka 流
用于 Apache Kafka 的 Spring 提供了一个工厂 bean 来创建StreamsBuilder对象并 Management 其流的生命周期。只要kafka-streams在 Classpath 上并且通过@EnableKafkaStreamsComments 启用 Kafka Streams,Spring Boot 就会自动配置所需的KafkaStreamsConfiguration bean。
启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。可以使用spring.kafka.streams.application-id配置前者,如果未设置,则默认为spring.application.name。后者可以全局设置,也可以仅针对流进行覆盖。
使用专用属性可以使用几个附加属性。可以使用spring.kafka.streams.properties名称空间设置其他任意 Kafka 属性。另请参见第 33.3.4 节“其他 Kafka 属性”。
要使用工厂 bean,只需将StreamsBuilder连接到您的@Bean中,如以下示例所示:
@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
}
默认情况下,由它创建的StreamBuilder对象 Management 的流将自动启动。您可以使用spring.kafka.streams.auto-startup属性来自定义此行为。
33.3.4 Kafka 的其他属性
自动配置支持的属性显示在附录 A,通用应用程序属性中。请注意,在大多数情况下,这些属性(连字符或 camelCase)直接 Map 到 Apache Kafka 点缀属性。有关详细信息,请参阅 Apache Kafka 文档。
这些属性的前几个属性适用于所有组件(生产者,使用者,Management 员和流),但如果您希望使用不同的值,则可以在组件级别上指定。 Apache Kafka 会指定重要性为 HIGH,MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值的属性。
Kafka 支持的属性的子集仅可通过KafkaProperties类直接使用。如果希望使用不直接支持的其他属性来配置生产者或使用者,请使用以下属性:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
这将公共prop.one Kafka 属性设置为first(适用于生产者,Consumer 和 Management 员),将prop.two admin 属性设置为second,将prop.threeConsumer 属性设置为third,将prop.four生产者属性设置为fourth,并将prop.five streams 属性设置为fifth。
您还可以如下配置 Spring Kafka JsonDeserializer:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
同样,您可以禁用在 Headers 中发送类型信息的JsonSerializer默认行为:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
Tip
以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。