33. Messaging

Spring 框架为与消息传递系统集成提供了广泛的支持,从简化使用JmsTemplate的 JMS API 的使用到完整的异步接收消息的基础结构。 Spring AMQP 为高级消息队列协议提供了类似的功能集。 Spring Boot 还为RabbitTemplate和 RabbitMQ 提供了自动配置选项。 Spring WebSocket 本身就包含对 STOMP 消息的支持,而 Spring Boot 通过启动程序和少量的自动配置对此提供了支持。 Spring Boot 还支持 Apache Kafka。

33.1 JMS

javax.jms.ConnectionFactory接口提供了创建javax.jms.Connection与 JMS 代理进行交互的标准方法。尽管 Spring 需要ConnectionFactory才能与 JMS 一起使用,但是您通常不需要自己直接使用它,而是可以依靠更高级别的消息抽象。 (有关详细信息,请参见 Spring Framework 参考文档的relevant section。)Spring Boot 还自动配置必要的基础结构来发送和接收消息。

33.1.1 ActiveMQ 支持

ActiveMQ在 Classpath 上可用时,Spring Boot 也可以配置ConnectionFactory。如果存在代理,则将自动启动和配置嵌入式代理(前提是未通过配置指定代理 URL)。

Note

如果使用spring-boot-starter-activemq,则将提供连接或嵌入 ActiveMQ 实例所需的依赖关系,以及与 JMS 集成的 Spring 基础结构。

ActiveMQ 配置由spring.activemq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret

默认情况下,CachingConnectionFactory使用可以由spring.jms.*中的外部配置属性控制的明智设置来包装本机ConnectionFactory

spring.jms.cache.session-cache-size=5

如果您想使用本机池,则可以通过向org.messaginghub:pooled-jms添加依赖项并相应地配置JmsPoolConnectionFactory来实现,如下例所示:

spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50

Tip

有关更多受支持的选项,请参见ActiveMQProperties。您还可以注册任意数量的实现ActiveMQConnectionFactoryCustomizer的 bean,以进行更高级的自定义。

默认情况下,ActiveMQ 将创建一个目的地(如果目的地尚不存在),以便根据其提供的名称来解析目的地。

33.1.2 Artemis 支持

当 Spring Boot 检测到Artemis在 Classpath 中可用时,它可以自动配置ConnectionFactory。如果存在代理,则将自动启动和配置嵌入式代理(除非已明确设置 mode 属性)。支持的模式为embedded(以明确要求使用嵌入式代理,并且如果代理在 Classpath 上不可用,则会发生错误)和native(使用netty传输协议连接到代理)。配置后者后,Spring Boot 将使用默认设置配置一个ConnectionFactory,该ConnectionFactory连接到在本地计算机上运行的代理。

Note

如果使用spring-boot-starter-artemis,则将提供连接到现有 Artemis 实例所需的依赖关系,以及与 JMS 集成的 Spring 基础结构。将org.apache.activemq:artemis-jms-server添加到您的应用程序可让您使用嵌入式模式。

Artemis 配置由spring.artemis.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret

嵌入代理时,可以选择是否要启用持久性并列出应使其可用的目的地。可以将它们指定为以逗号分隔的列表,以使用默认选项创建它们,也可以为高级队列和主题配置分别定义类型org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration的 bean。

默认情况下,CachingConnectionFactory使用可以由spring.jms.*中的外部配置属性控制的明智设置来包装本机ConnectionFactory

spring.jms.cache.session-cache-size=5

如果您想使用本机池,则可以通过向org.messaginghub:pooled-jms添加依赖项并相应地配置JmsPoolConnectionFactory来实现,如下例所示:

spring.artemis.pool.enabled=true
spring.artemis.pool.max-connections=50

有关更多受支持的选项,请参见ArtemisProperties

不涉及 JNDI 查找,并且使用 Artemis 配置中的name属性或通过配置提供的名称来根据目的地名称解析目的地。

33.1.3 使用 JNDI ConnectionFactory

如果您正在应用程序服务器中运行应用程序,则 Spring Boot 会尝试使用 JNDI 查找 JMS ConnectionFactory。默认情况下,选中java:/JmsXAjava:/XAConnectionFactory位置。如果需要指定替代位置,则可以使用spring.jms.jndi-name属性,如以下示例所示:

spring.jms.jndi-name=java:/MyConnectionFactory

33.1.4 发送消息

Spring 的JmsTemplate是自动配置的,您可以将其直接自动连接到自己的 bean 中,如以下示例所示:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final JmsTemplate jmsTemplate;

	@Autowired
	public MyBean(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	// ...

}

Note

JmsMessagingTemplate可以类似的方式注入。如果定义了DestinationResolverMessageConverter bean,则将其自动关联到自动配置的JmsTemplate

33.1.5 接收消息

存在 JMS 基础结构时,可以使用@JmsListenerComments 任何 bean 以创建侦听器端点。如果未定义JmsListenerContainerFactory,则会自动配置一个默认值。如果定义了DestinationResolverMessageConverter bean,它将自动关联到默认工厂。

默认情况下,默认工厂是事务性的。如果您在存在JtaTransactionManager的基础结构中运行,则默认情况下会将其与侦听器容器关联。如果不是,则启用sessionTransacted标志。在后一种情况下,可以通过在侦听器方法(或其委托)上添加@Transactional来将本地数据存储事务与传入消息的处理相关联。这样可以确保本地事务完成后,传入消息得到确认。这还包括发送已在同一 JMS 会话上执行的响应消息。

以下组件在someQueue目标上创建侦听器端点:

@Component
public class MyBean {

	@JmsListener(destination = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}

Tip

有关更多详细信息,请参见@EnableJms 的 Javadoc

如果您需要创建更多的JmsListenerContainerFactory实例,或者想覆盖默认实例,Spring Boot 提供了一个DefaultJmsListenerContainerFactoryConfigurer,您可以使用与自动配置的设置相同的设置来初始化DefaultJmsListenerContainerFactory

例如,以下示例公开了另一个使用特定MessageConverter的工厂:

@Configuration
static class JmsConfiguration {

	@Bean
	public DefaultJmsListenerContainerFactory myFactory(
			DefaultJmsListenerContainerFactoryConfigurer configurer) {
		DefaultJmsListenerContainerFactory factory =
				new DefaultJmsListenerContainerFactory();
		configurer.configure(factory, connectionFactory());
		factory.setMessageConverter(myMessageConverter());
		return factory;
	}

}

然后,您可以在任何带有@JmsListenerComments 的方法中使用工厂,如下所示:

@Component
public class MyBean {

	@JmsListener(destination = "someQueue", containerFactory="myFactory")
	public void processMessage(String content) {
		// ...
	}

}

33.2 AMQP

高级消息队列协议(AMQP)是面向消息中间件的与平台无关的有线级别协议。 Spring AMQP 项目将 Spring 的核心概念应用于基于 AMQP 的消息传递解决方案的开发。 Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了许多便利,包括spring-boot-starter-amqp“ Starter”。

33.2.1 RabbitMQ 支持

RabbitMQ是基于 AMQP 协议的轻型,可靠,可伸缩和便携式消息代理。 Spring 使用RabbitMQ通过 AMQP 协议进行通信。

RabbitMQ 配置由spring.rabbitmq.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret

如果上下文中存在ConnectionNameStrategy bean,它将自动用于命名由自动配置的ConnectionFactory创建的连接。有关更多受支持的选项,请参见RabbitProperties

Tip

有关更多详细信息,请参见了解 AMQP,RabbitMQ 使用的协议

33.2.2 发送信息

Spring 的AmqpTemplateAmqpAdmin是自动配置的,您可以将它们直接自动连接到自己的 bean 中,如以下示例所示:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;
	private final AmqpTemplate amqpTemplate;

	@Autowired
	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

}

Note

RabbitMessagingTemplate可以类似的方式注入。如果定义了MessageConverter bean,它将自动关联到自动配置的AmqpTemplate

如有必要,任何定义为 bean 的org.springframework.amqp.core.Queue都会自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,可以在AmqpTemplate上启用重试(例如,在代理连接丢失的情况下):

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s

默认情况下,重试是禁用的。您还可以pass 语句RabbitRetryTemplateCustomizer bean 来以编程方式自定义RetryTemplate

33.2.3 接收消息

存在 Rabbit 基础结构时,可以使用@RabbitListenerComments 任何 bean 以创建侦听器端点。如果未定义RabbitListenerContainerFactory,则会自动配置默认的SimpleRabbitListenerContainerFactory,并且可以使用spring.rabbitmq.listener.type属性切换到直接容器。如果定义了MessageConverterMessageRecoverer bean,它将自动与默认工厂关联。

以下示例组件在someQueue队列上创建一个侦听器端点:

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}

Tip

有关更多详细信息,请参见@EnableRabbit 的 Javadoc

如果您需要创建更多的RabbitListenerContainerFactory实例,或者想覆盖默认实例,Spring Boot 提供了SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,您可以使用它们设置与自动配置使用的工厂相同的设置来初始化SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory

Tip

选择哪种容器都没有关系。这两个 bean 通过自动配置公开。

例如,以下配置类公开了另一个使用特定MessageConverter的工厂:

@Configuration
static class RabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(
			SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory =
				new SimpleRabbitListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(myMessageConverter());
		return factory;
	}

}

然后,您可以使用任何带有@RabbitListenerComments 的方法来使用工厂,如下所示:

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory="myFactory")
	public void processMessage(String content) {
		// ...
	}

}

您可以启用重试来处理侦听器引发异常的情况。默认情况下,使用RejectAndDontRequeueRecoverer,但是您可以定义自己的MessageRecoverer。重试用尽后,如果将代理配置为这样做,则消息将被拒绝并被丢弃或路由到死信交换。默认情况下,重试是禁用的。您也可以pass 语句RabbitRetryTemplateCustomizer bean 来以编程方式自定义RetryTemplate

Tip

默认情况下,如果禁用了重试,并且侦听器引发了异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将defaultRequeueRejected属性设置为false,以便尝试进行零次重新传递或抛出AmqpRejectAndDontRequeueException来指示应拒绝该消息。后者是启用重试并达到最大传递尝试次数时使用的机制。

33.3 Apache Kafka 支持

通过提供spring-kafka项目的自动配置来支持Apache Kafka

Kafka 配置由spring.kafka.*中的外部配置属性控制。例如,您可以在application.properties中声明以下部分:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

Tip

要在启动时创建主题,请添加NewTopic类型的 Bean。如果该主题已经存在,则将忽略 Bean。

有关更多受支持的选项,请参见KafkaProperties

33.3.1 发送消息

Spring 的KafkaTemplate是自动配置的,您可以直接在自己的 bean 中自动对其进行布线,如以下示例所示:

@Component
public class MyBean {

	private final KafkaTemplate kafkaTemplate;

	@Autowired
	public MyBean(KafkaTemplate kafkaTemplate) {
		this.kafkaTemplate = kafkaTemplate;
	}

	// ...

}

Note

如果定义了属性spring.kafka.producer.transaction-id-prefix,则会自动配置KafkaTransactionManager。同样,如果定义了RecordMessageConverter bean,它将自动与自动配置的KafkaTemplate关联。

33.3.2 接收消息

存在 Apache Kafka 基础结构时,可以使用@KafkaListenerComments 任何 bean 以创建侦听器端点。如果未定义KafkaListenerContainerFactory,则会使用spring.kafka.listener.*中定义的键自动配置一个默认值。

以下组件在someTopic主题上创建侦听器终结点:

@Component
public class MyBean {

	@KafkaListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}

如果定义了KafkaTransactionManager bean,它将自动与容器工厂关联。同样,如果定义了RecordMessageConverterErrorHandlerAfterRollbackProcessor bean,它将自动与默认工厂关联。

Tip

自定义ChainedKafkaTransactionManager必须标记为@Primary,因为它通常引用自动配置的KafkaTransactionManager bean。

33.3.3 Kafka 流

用于 Apache Kafka 的 Spring 提供了一个工厂 bean 来创建StreamsBuilder对象并 Management 其流的生命周期。只要kafka-streams在 Classpath 上并且通过@EnableKafkaStreamsComments 启用 Kafka Streams,Spring Boot 就会自动配置所需的KafkaStreamsConfiguration bean。

启用 Kafka Streams 意味着必须设置应用程序 ID 和引导服务器。可以使用spring.kafka.streams.application-id配置前者,如果未设置,则默认为spring.application.name。后者可以全局设置,也可以仅针对流进行覆盖。

使用专用属性可以使用几个附加属性。可以使用spring.kafka.streams.properties名称空间设置其他任意 Kafka 属性。另请参见第 33.3.4 节“其他 Kafka 属性”

要使用工厂 bean,只需将StreamsBuilder连接到您的@Bean中,如以下示例所示:

@Configuration
@EnableKafkaStreams
static class KafkaStreamsExampleConfiguration {

	@Bean
	public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
		KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
		stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
				Produced.with(Serdes.Integer(), new JsonSerde<>()));
		return stream;
	}

}

默认情况下,由它创建的StreamBuilder对象 Management 的流将自动启动。您可以使用spring.kafka.streams.auto-startup属性来自定义此行为。

33.3.4 Kafka 的其他属性

自动配置支持的属性显示在附录 A,通用应用程序属性中。请注意,在大多数情况下,这些属性(连字符或 camelCase)直接 Map 到 Apache Kafka 点缀属性。有关详细信息,请参阅 Apache Kafka 文档。

这些属性的前几个属性适用于所有组件(生产者,使用者,Management 员和流),但如果您希望使用不同的值,则可以在组件级别上指定。 Apache Kafka 会指定重要性为 HIGH,MEDIUM 或 LOW 的属性。 Spring Boot 自动配置支持所有 HIGH 重要性属性,一些选定的 MEDIUM 和 LOW 属性以及任何没有默认值的属性。

Kafka 支持的属性的子集仅可通过KafkaProperties类直接使用。如果希望使用不直接支持的其他属性来配置生产者或使用者,请使用以下属性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

这将公共prop.one Kafka 属性设置为first(适用于生产者,Consumer 和 Management 员),将prop.two admin 属性设置为second,将prop.threeConsumer 属性设置为third,将prop.four生产者属性设置为fourth,并将prop.five streams 属性设置为fifth

您还可以如下配置 Spring Kafka JsonDeserializer

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

同样,您可以禁用在 Headers 中发送类型信息的JsonSerializer默认行为:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false

Tip

以这种方式设置的属性将覆盖 Spring Boot 显式支持的任何配置项。