2. Introduction
参考文档的第一部分概述了 Spring AMQP 及其基本概念和一些代码片段,这些内容将使您尽快启动并运行。
2.1 不耐烦的快速浏览
2.1.1 Introduction
这是从 Spring AMQP 开始的 5 分钟导览。
先决条件:安装并运行 RabbitMQ 代理(http://www.rabbitmq.com/download.html)。然后获取 spring-rabbit JAR 及其所有依赖项-最简单的方法是在构建工具中声明一个依赖项,例如对于 Maven:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.2.RELEASE</version>
</dependency>
对于 gradle:
compile 'org.springframework.amqp:spring-rabbit:2.1.2.RELEASE'
Compatibility
Spring Framework 的最低版本依赖性为 5.0.x。
最低amqp-client
javaClient 端库版本为 5.0.0.
非常非常快
使用简单的命令式 Java 发送和接收消息:
ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
请注意,本机 Java RabbitClient 端中也有一个ConnectionFactory
。我们在上面的代码中使用 Spring 抽象。我们依赖于代理中的默认交换(因为在 send 中未指定),以及所有队列通过其名称到默认交换的默认绑定(因此,我们可以将队列名称用作 send 中的路由键) 。这些行为在 AMQP 规范中定义。
使用 XML 配置
与上面相同的示例,但是将资源配置外部化为 XML:
ApplicationContext context =
new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="myqueue"/>
</beans>
默认情况下,<rabbit:admin/>
声明会自动查找Queue
,Exchange
和Binding
类型的 bean 并代表用户将其声明给代理,因此无需在简单的 Java 驱动程序中显式使用该 bean。有很多选项可以配置 XML 模式中组件的属性-您可以使用 XML 编辑器的自动完成功能来探索它们并查看其文档。
使用 Java 配置
同样的示例使用 Java 中的外部配置:
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
........
@Configuration
public class RabbitConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
}
2.2 新增功能
2.2.1 2.1 自 2.0 版以来的变化
AMQPClient 端库
Spring AMQP 现在使用 RabbitMQ 团队提供的amqp-client
库的新 5.2.x 版本。此 Client 端默认情况下配置了自动恢复;参见“ RabbitMQ 自动连接/拓扑恢复”部分。
Note
从版本 4.0 开始,Client 端默认情况下启用自动恢复;默认情况下,Client 端将自动恢复。虽然与该功能兼容,但 Spring AMQP 拥有自己的恢复机制,通常不需要 Client 端恢复功能。建议禁用amqp-client
自动恢复,以避免在代理可用但连接尚未恢复时获得AutoRecoverConnectionNotCurrentlyOpenException
s。从* version 1.7.1 *开始,Spring AMQP 禁用它,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给CachingConnectionFactory
。 RabbitConnectionFactoryBean
创建的 RabbitMQ ConnectionFactory
实例默认情况下也会禁用该选项。
Package Changes
某些类已移至不同的包;大多数是内部类,不会影响用户应用程序。 ChannelAwareMessageListener
和RabbitListenerErrorHandler
是两个 exception;这些接口现在位于org.springframework.amqp.rabbit.listener.api
中。
发布者确认更改
当有未完成的确认时,为发布者确认启用的通道不会返回到缓存。有关更多信息,请参见称为“发布者确认并返回”的部分。
侦听器容器工厂的改进
现在可以使用侦听器容器工厂来创建任何侦听器容器,而不仅仅是用于@RabbitListener
s 或@RabbitListenerEndpointRegistry
的容器。有关更多信息,请参见“使用容器工厂”一节。
ChannelAwareMessageListener
现在从MessageListener
继承。
broker 事件监听器
引入了BrokerEventListener
以将选定的代理事件发布为ApplicationEvent
。有关更多信息,请参见第 3.1.12 节“代理事件监听器”。
RabbitAdmin Changes
RabbitAdmin
将发现类型为Declarables
的 bean(这是Declarable
-Queue
,Exchange
和Binding
对象的容器),并在代理上声明包含的对象。不鼓励用户使用旧的声明<Collection<Queue>>
etc 的机制,而应该使用Declarables
beans。默认情况下,旧机制处于禁用状态。有关更多信息,请参见称为“声明交换,队列,绑定的集合”的部分。
现在声明AnonymousQueue
,默认情况下x-queue-master-locator
设置为client-local
,以确保在与应用程序连接的节点上创建队列。有关更多信息,请参见第 3.1.11 节,“配置代理”。
RabbitTemplate Changes
现在可以使用noLocalReplyConsumer
选项配置RabbitTemplate
,以控制sendAndReceive()
操作中答复使用者的noLocal
标志。有关更多信息,请参见第 3.1.10 节“请求/回复消息”。
发布商的CorrelationData
确认现在有一个ListenableFuture
,可用于获取确认而不是使用回调。启用返回和确认后,将使用返回的消息填充相关数据(如果提供)。有关更多信息,请参见称为“发布者确认并返回”的部分。
现在提供了方法replyTimedOut
来通知子类答复已超时,从而允许进行任何状态清除。有关更多信息,请参见称为“答复超时”的部分。
Message Conversion
引入了新的Jackson2XmlMessageConverter
以支持将消息从 XML 格式转换为 XML 格式。有关更多信息,请参见名为“ Jackson2XmlMessageConverter”的部分。
ManagementREST API
现在不推荐使用RabbitManagementTemplate
,而直接使用com.rabbitmq.http.client.Client
(或com.rabbitmq.http.client.ReactorNettyClient
)。有关更多信息,请参见第 3.1.14 节“ RabbitMQ REST API”。
@RabbitListener Changes
现在可以为侦听器容器工厂配置RetryTemplate
,并且在发送回复时可以使用RecoveryCallback
。有关更多信息,请参见名为“启用侦听器端点 Comments”的部分。
异步@RabbitListener 返回
@RabbitListener
个方法现在可以返回ListenableFuture<?>
或Mono<?>
。有关更多信息,请参见???。
连接工厂 Bean 更改
RabbitConnectionFactoryBean
现在默认调用enableHostnameVerification()
;若要还原为以前的行为,请将enableHostnameVerification
属性设置为false
。
连接工厂更改
CachingConnectionFactory
现在将无条件地禁用基础 RabbitMQ ConnectionFactory
中的自动恢复,即使在构造函数中提供了预配置的实例也是如此。尽管已采取措施使 Spring AMQP 与自动恢复兼容,但在某些极端情况下仍然存在问题。 Spring AMQP 从 1.0.0 版本开始就有自己的恢复机制,不需要使用 Client 端提供的恢复。虽然在构建CachingConnectionFactory
之后仍然可以启用该功能(使用cachingConnectionFactory.getRabbitConnectionFactory()
.setAutomaticRecoveryEnabled()
),但是 强烈建议您不要启用 。如果在直接使用 Client 端工厂时(而不是通过 Spring AMQP 组件)需要自动恢复连接,请使用单独的 RabbitMQ ConnectionFactory
。
侦听器容器更改
如果存在x-death
Headers,则默认ConditionalRejectingErrorHandler
现在将完全丢弃导致致命错误的消息。有关更多信息,请参见第 3.1.15 节“异常处理”。
Immediate requeue
引入了一个新的ImmediateRequeueAmqpException
,以通知侦听器容器必须重新排队该消息。为了利用此功能,添加了新的ImmediateRequeueMessageRecoverer
实现。
有关更多信息,请参见称为“消息侦听器和异步情况”的部分。