β

Spring AMQP中文文档

liuxing's blog 64 阅读

1.前言

Spring AMQP项目是用于开发AMQP的解决方案。 我们提供一个“模板”作为发送和接收消息的抽象。我们还为普通POJO进行提供消息处理支持。这些库促进AMQP资源的管理,同时支持使用依赖注入和声明式配置。 在所有情况下,您将看到与Spring Framework中的JMS支持的相似之处。有关其他项目相关信息,请访问Spring AMQP 项目主页

2.介绍

该帮助文档的第一部分是Spring AMQP以及基本概念和一些代码段的概述,可以尽快帮助您快速使用。

2.1 快速入门

2.1.1 介绍

五分钟快速使用Spring AMQP.

先决条件:安装并运行 RabbitMQ 。然后在您的项目中加入如下MAVEN依赖:

ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

请注意,Java Rabbit客户端中也有一个 ConnectionFactory
我们在上面的代码中使用了Spring抽象的 ConnectionFactory
我们使用Rabbit的默认 exchange (因为发送中没有指定),并且所有队列默认绑定到默认 exchange (因此我们可以在发送中使用队列名称作为 routing key )。
这些行为在AMQP规范中定义。

使用XML配置

上述例子在XML中的配置形式如下

ApplicationContext context =
new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

默认情况下, <rabbit:admin /> 声明会自动查找类型为 Queue Exchange Binding 的bean,并将他们绑定,因此不需要在简单的Java程序中明确使用该bean。在XML模式中配置组件的属性有很多选项 - 您可以使用XML编辑器的自动完成功能来浏览它们并查看其文档。

使用java配置

相同的代码在java代码中的另一种配置

public class Message {

private final MessageProperties messageProperties;

private final byte[] body;

public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}

public byte[] getBody() {
return this.body;
}

public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}

MessageProperties接口定义了几个常见的属性,如messageId,timestamp,contentType等等。
这些属性也可以通过调用setHeader(String key,Object value)方法来扩展用户定义的头属性。

exchange(交换机)

Exchange接口表示AMQP Exchange,这是消息生产者发送到的。
代理的虚拟主机中的每个Exchange将具有唯一的名称以及一些其他属性:

public class Queue  {

private final String name;

private volatile boolean durable;

private volatile boolean exclusive;

private volatile boolean autoDelete;

private volatile Map<String, Object> arguments;

/**
* 队列是持久的,非排他的和非自动删除的。
*
* @param name 队列名
*/
public Queue(String name) {
this(name, true, false, false);
}

// Getters and Setters omitted for brevity

}

请注意,构造函数采用队列名称。根据实现,管理模板可以提供用于生成唯一命名的队列的方法。这样的队列可以用作 reply-to 地址或其他临时情况。因此,自动生成的队列的exclusive和autoDelete属性都将设置为true。

有关使用命名空间支持(包括队列参数)声明队列的信息,请参见第3.1.10节“配置代理”中的队列部分。

Binding(绑定)

鉴于生产者发送到Exchange并且消费者从队列接收到消息,将队列连接到exchange的绑定对于通过消息传递连接这些生产者和消费者至关重要。
在Spring AMQP中,我们定义一个Binding类来表示这些连接。
我们来看看将队列绑定到交换机的基本选项。

您可以使用固定的routing key将队列绑定到DirectExchange。

new Binding(someQueue, someTopicExchange, "foo.*")

您可以使用无routing key将Queue绑定到FanoutExchange。

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

为了清楚起见,上面显示了BindingBuilder类,但是对于bind()方法使用静态导入时,此样式很好。

Binding类的一个实例本身就是持有关于连接的数据。
换句话说,它不是一个“活跃”组件。
但是,正如您将在3.1.10节“配置代理”中看到的,AmqpAdmin类可以使用Binding实例来实际触发代理上的绑定操作。
另外,正如你将在同一部分中看到的,Binding实例可以使用@Configuration类中的Spring的@ Bean风格进行定义。
还有一个方便的基类,它进一步简化了生成AMQP相关bean定义的方法,并识别队列,交换和绑定,以便在应用程序启动时将它们全部声明在AMQP代理上。

AmqpTemplate也在核心包中定义。作为实际AMQP消息传递中涉及的主要组件之一,将在其自己的部分中详细讨论(参见第3.1.4节“AmqpTemplate”)。

3.1.2连接和资源管理

介绍

我们上一节描述的AMQP模型是通用的,适用于所有实现,当我们进入资源管理时,特定的场景需要特殊实现。因此,在本节中,我们将专注于仅存在于我们的“spring-rabbit”模块中的代码,因为在这一点上,RabbitMQ是唯一支持的实现。

用于管理与RabbitMQ代理的连接的中心组件是 ConnectionFactory 接口。 ConnectionFactory 实现的责任是提供一个 org.springframework.amqp.rabbit.connection.Connection 的实例,它是 com.rabbitmq.client.Connection 的包装器。我们提供的唯一具体实现是 CachingConnectionFactory ,默认情况下,它建立可以由应用程序共享的单个连接代理。连接是共享的,因为与AMQP通信的“工作单位”实际上是一个“通道”(在某些方面,这与JMS中的连接和会话之间的关系类似)。您可以想像,连接实例提供了一个 createChannel 方法。 CachingConnectionFactory 实现支持对这些通道的缓存,并且基于它们是否是事务来维护单独的通道高速缓存。创建 CachingConnectionFactory 实例时,可以通过构造函数提供主机名。还应提供用户名和密码属性。如果要配置通道缓存的大小(默认值为25),您也可以在此处调用 setChannelCacheSize() 方法。

从1.3版开始, CachingConnectionFactory 可以配置为缓存连接以及仅通道。
在这种情况下,每次调用 createConnection() 都会创建一个新的连接(或从缓存中检索一个空闲的连接)。
关闭连接将返回到缓存(如果尚未达到高速缓存大小)。
在这种连接上创建的通道也被缓存。
使用单独的连接在某些环境中可能是有用的,例如从HA群集中消耗负载均衡器连接到不同的群集成员。
cacheMode 设置为 CacheMode.CONNECTION

这不限制连接数,它指定允许多少空闲打开连接。

从版本1.5.5开始,提供了一个新的属性 connectionLimit 。当设置此项时,它限制允许的连接总数。设置后,如果达到限制,则使用 channelCheckoutTimeLimit 等待连接变为空闲状态。如果超过时间,则抛出 AmqpTimeoutException

重要提示
当缓存模式为CONNECTION时,不支持自动声明队列等(请参阅“自动声明交换,队列和绑定”一节)。
此外,在编写本文时,rabbitmq-client库默认为每个连接创建一个固定的线程池(5个线程)。当使用大量连接时,应考虑在CachingConnectionFactory上设置自定义执行程序。然后,所有连接将使用相同的执行程序,并且可以共享它的线程。执行者的线程池应该是无限制的,或者针对预期的利用率进行适当设置(通常每个连接至少有一个线程)。如果在每个连接上创建多个通道,则池大小将影响并发性,因此变量(或简单的缓存)线程池执行器将是最合适的。

重要的是要明白,缓存大小(默认情况下)不是限制,只是可以缓存的通道数。具有例如10的高速缓存大小,实际上可以使用任何数量的频道。如果正在使用10个以上的通道,并将它们全部返回到缓存,则10将进入高速缓存;其余部分将被物理关闭。

从版本1.6开始,默认通道缓存大小从1增加到25。在高容量,多线程环境中,小缓存意味着以高速率创建和关闭通道。增加默认缓存大小将避免这种开销。您应该通过RabbitMQ管理界面监视正在使用的频道,并考虑在创建和关闭许多通道时进一步增加高速缓存大小。缓存只会按需增长(以适应应用程序的并发需求),因此此更改不会影响现有的低容量应用程序。

从版本1.4.2开始, CachingConnectionFactory 具有一个属性 channelCheckoutTimeout 。当此属性大于零时, channelCacheSiz e将成为可在连接上创建的通道数量的限制。如果达到限制,调用线程将阻塞,直到通道可用或达到此超时,在这种情况下抛出一个 AmqpTimeoutException

框架内使用的通道(例如RabbitTemplate)将可靠地返回到缓存。如果您在框架之外创建渠道(例如通过直接访问连接并调用createChannel()),则必须可靠地将其返回(通过关闭),也许在finally块中,以避免使用通道。

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>

还有一个 SingleConnectionFactory 实现,仅在框架的单元测试代码中可用。它比 CachingConnectionFactory 简单,因为它不缓存通道,但是由于其缺乏性能和弹性,它不适用于简单测试之外的实际使用。如果您因为某些原因需要实现自己的 ConnectionFactory ,那么 AbstractConnectionFactory 基类可能会提供一个很好的起点。

可以使用rabbit命名空间快速方便地创建ConnectionFactory:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

使用命名空间,您只需添加channel-cache-size属性即可:

<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

可以使用命名空间设置主机和端口属性

<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672"/>

这里有一个自定义线程工厂的例子,它使用rabbitmq-前缀线程名。

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可以用来区分目标连接名称一些逻辑。默认情况下,使用 AbstractConnectionFactory beanName 和内部计数器来生成 connection_name <rabbit:connection-factory> 命名空间组件也提供了 connection-name-strategy 属性。

配置底层客户端连接工厂

CachingConnectionFactory 使用Rabbit客户端 ConnectionFactory 的实例;当设置 CachingConnectionFactory 上的等效属性时,会传递一些配置属性(例如, host port userName password requestedHeartBeat connectionTimeout )。要设置其他属性(例如 clientProperties ),请定义 rabbit factory的实例,并使用适当的 CachingConnectionFactory 构造函数提供对它的引用。当使用如上所述的命名空间时,在 connection-factory 属性中提供对配置工厂的引用。为方便起见,提供了一个工厂bean,以帮助在Spring应用程序环境中配置连接工厂,如下一节所述。

<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
class="org.springframework.xd.dirt.integration.rabbit.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="file:/secrets/rabbitSSL.properties"/>
</bean>

有关配置SSL的信息,请参阅RabbitMQ文档。省略keyStore和trustStore配置以通过SSL连接,而无需证书验证。密钥和信任存储配置可以提供如下:

sslPropertiesLocation 属性是指向包含以下键的属性文件的Spring Resource

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
@Autowired
private RabbitProperties props;

private final String[] adminUris = { "http://host1:15672", "http://host2:15672" };

private final String[] nodes = { "rabbit@host1", "rabbit@host2" };

@Bean
public ConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}

@Bean
public ConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
this.adminUris, this.nodes,
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}

请注意,前三个参数是addresses,adminUris和nodes。这些是位置性的,因为当容器尝试连接到队列时,它确定队列被掌握在哪个节点上并连接到同一阵列位置的地址。

Publisher Confirms and Returns

通过将CachingConnectionFactory的publisherConfirms和publisherReturns属性分别设置为“true”,支持确认和返回的消息。

设置这些选项时,工厂创建的通道将被包装在PublisherCallbackChannel中,该通道用于方便回调。当获得这样的频道时,客户端可以使用频道注册一个PublisherCallbackChannel.Listener。 PublisherCallbackChannel实现包含将 确认或返回 路由到适当的监听器的逻辑。以下部分将进一步说明这些功能。

有关更多背景信息,请参阅RabbitMQ小组题为“引入发布者确认”的博客文章。

记录通道关闭事件

版本1.5中引入了一种使用户能够控制日志记录级别的机制。

CachingConnectionFactory使用默认策略来记录通道关闭,如下所示:

要修改此行为,请在其closeExceptionLogger属性中的CachingConnectionFactory中注入自定义ConditionalExceptionLogger。

另请参阅“消费者失败事件”一节。

运行缓存属性

Table 3.1 CacheMode.CHANNEL的缓存属性

属性 说明
connectionName ConnectionNameStrategy生成的连接的名称。
channelCacheSize 当前配置的允许空闲的最大通道。
localPort 连接的本地端口(如果可用)。这可以用于与RabbitMQ管理界面上的连接/通道相关联。
idleChannelsTx 当前空闲(缓存)的事务通道的数量。
idleChannelsNotTx 当前空闲(高速缓存)的非事务性通道的数量。
idleChannelsTxHighWater 同时空闲(缓存)的事务通道的最大数量。
idleChannelsNotTxHighWater 非事务性通道的最大数量同时处于空闲状态(缓存)。

Table 3.2 CacheMode.CONNECTION的缓存属性

属性 说明
connectionName:localPort ConnectionNameStrategy生成的连接的名称。
openConnections 表示与经纪人连接的连接对象数。
channelCacheSize 当前配置的允许空闲的最大通道。
connectionCacheSize 当前配置的允许空闲的最大连接。
idleConnections 当前空闲的连接数。
idleConnectionsHighWater 同时空闲的最大连接数。
idleChannelsTx:localPort 当前为此连接空闲(高速缓存)的事务通道的数量。属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接/通道相关联。
idleChannelsNotTx:localPort 当前为此连接空闲(高速缓存)的非事务性通道的数量。属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接/通道相关联。
idleChannelsTxHighWater:localPort 同时空闲(缓存)的事务通道的最大数量。属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接/通道相关联。
idleChannelsNotTxHighWater:localPort 非事务性通道的最大数量同时处于空闲状态(缓存)。属性名称的localPort部分可用于与RabbitMQ Admin UI上的连接/通道相关联。

cacheMode 属性(还包括 CHANNEL CONNECTION )。

此处输入图片的描述

RabbitMQ自动连接/拓扑恢复

从Spring AMQP的第一个版本开始,框架在发生代理失败时提供了自己的连接和通道恢复。另外,如第3.1.10节“配置代理”所述,当重新建立连接时,RabbitAdmin将重新声明任何基础架构bean(队列等)。因此,它不依赖于由amqp-client库提供的自动恢复。 Spring AMQP现在使用4.0.x版本的amqp-client,默认情况下启用自动恢复。 Spring AMQP仍然可以使用自己的恢复机制,如果您希望,在客户端禁用它(通过将底层RabbitMQ connectionFactory上的automaticRecoveryEnabled属性设置为false)。但是,该框架与启用自动恢复完全兼容。这意味着您在代码中创建的任何消费者(可能通过RabbitTemplate.execute())都可以自动恢复。

3.1.3 添加自定义客户端连接属性

CachingConnectionFactory现在允许您访问底层连接工厂,例如设置自定义客户端属性:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>

使用@Configuration:

retryTemplate.execute(
new RetryCallback<Object, Exception>() {

@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {

@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}

在这种情况下,您不会将RetryTemplate注入RabbitTemplate。

Publisher Confirms and Returns

AmqpTemplate的RabbitTemplate实现Publisher Confirms and Returns。

对于返回的消息,模板的必需属性必须设置为true,否则强制表达式必须对特定消息进行求值。此功能需要一个CachedConnectionFactory,其publisherReturns属性设置为true(参见“发布者确认和返回”一节)。通过调用setReturnCallback(ReturnCallback callback)注册一个RabbitTemplate.ReturnCallback,返回给客户端。回调必须实现这个方法:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData是发送原始消息时由客户端提供的对象。 ack是真实的,对于一个nack是假的。对于nack,原因可能包含一个原因,如果在生成nack时可用。一个例子是向不存在的交换发送消息时。在这种情况下,经纪人关闭渠道;原因包括在内的原因。原因在版本1.4中被添加。

RabbitTemplate只支持一个ConfirmCallback。

RabbitTemplate发送操作完成后,通道关闭;这将阻止在连接工厂缓存已满时(当缓存中有空间,通道没有物理关闭并且返回/确认将正常进行时)的接收确认或返回。当缓存已满时,框架会将关闭延迟最多5秒,以便允许接收确认/返回的时间。当使用确认时,当接收到最后一次确认时,通道将被关闭。当仅使用退货时,通道将保持打开5秒钟。通常建议将连接工厂的channelCacheSize设置为足够大的值,以便将发布消息的通道返回到缓存,而不是关闭。您可以使用RabbitMQ管理插件监视频道使用情况;如果您看到频道正在快速打开/关闭,您应该考虑增加缓存大小以减少服务器上的开销。

消息集成

从版本1.4开始,构建在 RabbitTemplate 之上的 RabbitMessagingTemplate 提供了与Spring Framework消息抽象(即 org.springframework.messaging.Message )的集成。这允许您使用 spring-messaging Message<?> 抽象发送和接收消息。这种抽象是由Spring Integration和Spring的STOMP支持的其他Spring项目使用的。有两个消息转换器涉及;一个用于在Spring消息传递 Message<?> 和Spring AMQP的 Message 抽象之间进行转换,另一个用于在Spring AMQP的Message抽象与底层RabbitMQ客户端库所需的格式之间进行转换。默认情况下,消息有效载荷由提供的 RabbitTemplate 的消息转换器转换。或者,您可以使用其他有效载荷转换器注入自定义 MessagingMessageConverter

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一个例子是一个文字表达;第二个从应用程序上下文中的连接工厂bean获取用户名属性。

3.1.5 发送消息

介绍

发送消息时,可以使用以下任一方法:

amqpTemplate.send("marketData.topic", "quotes.nasdaq.FOO",
new Message("12.34".getBytes(), someProperties));

如果您打算在大部分或全部时间使用该模板实例发送到同一个交换机,则可以在模板本身上设置“交换”属性。在这种情况下,可以使用上面列出的第二种方法。以下示例在功能上等同于上一个示例:

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

更好的思考交换和路由关键属性的方法是显式方法参数将始终覆盖模板的默认值。实际上,即使你没有在模板上显式设置这些属性,总是存在默认值。在这两种情况下,默认是一个空字符串,但这实际上是一个明智的默认值。就路由密钥而言,首先并不总是必需的(例如扇出交换机)。此外,队列可能与一个空字符串绑定到一个Exchange。这些都是依赖模板的路由密钥属性的默认空字符串值的合法场景。就Exchange名称而言,空字符串是常用的,因为AMQP规范将“默认Exchange”定义为没有名称。由于所有队列都使用其名称作为绑定值自动绑定到该默认Exchange(即直接Exchange),所以上述第二种方法可用于通过默认Exchange进行的任何队列的简单点对点消息传递。只需提供队列名称作为“routingKey” - 或者通过在运行时提供方法参数:

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
Message Builder API

从版本1.3开始,消息构建器API由MessageBuilder和MessagePropertiesBuilder提供;它们提供了一种方便的“流利”手段创建消息或消息属性:

MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();

可以设置MessageProperties上定义的每个属性。其他方法包括setHeader(String key,String value),removeHeader(String key),removeHeaders()和copyProperties(MessageProperties属性)。每个属性设置方法都有一个set IfAbsent()变体。在存在默认初始值的情况下,该方法命名为set IfAbsentOrDefault()。

提供了五种静态方法来创建初始消息构建器:

public static MessagePropertiesBuilder newInstance() 1

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) 2

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) 3

1.新的消息属性对象使用默认值初始化。
2.构建器初始化,并且build()将返回所提供的属性对象。
3.参数的属性被复制到一个新的MessageProperties对象。

使用AmqpTemplate的RabbitTemplate实现,每个send()方法都有一个重载的版本,它接受一个附加的CorrelationData对象。当发布者确认被启用时,该对象将在第3.1.4节“AmqpTemplate”中描述的回调中返回。这允许发送者将确认(ack或nack)与发送的消息相关联。

从1.6.7版开始,引入了CorrelationAwareMessagePostProcessor接口,允许在消息转换后修改相关数据:

CorrelationData postProcess(Message message, CorrelationData correlationData);
Publisher Returns

当模板的强制属性为true时,返回的消息由第3.1.4节“AmqpTemplate”中描述的回调提供。

从版本1.4开始,RabbitTemplate支持根据每个请求消息评估的Spel mandatoryExpression属性作为根评估对象,解析为布尔值。
Bean的引用,比如“@myBean.isMandatory(#root)”可以在表达式中使用。

发送者返回也可以在RabbitTemplate内部用于发送和接收操作。有关详细信息,请参阅“回复超时”一节。

Batching

从版本1.4.2开始,已经介绍了BatchingRabbitTemplate。这是RabbitTemplate的一个子类,具有重写的发送方法,根据BatchingStrategy对消息进行批处理;只有批量完成时才将消息发送给RabbitMQ。

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

就像在发送消息的情况下,AmqpTemplate有一些方便的接收POJO而不是Message实例的方法,并且实现将提供一种自定义用于创建返回的对象的MessageConverter的方法:

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplate实现负责接收和回复阶段。在大多数情况下,您应该只提供一个ReceiveAndReplyCallback的实现来为接收到的消息执行一些业务逻辑,如果需要,可以构建回复对象或消息。注意,ReceiveAndReplyCallback可能返回null。在这种情况下,没有发送回复,receiveAndReply类似于receive方法。这允许将相同的队列用于消息的混合,其中一些可能不需要回复。

仅当提供的回调不是ReceiveAndReplyMessageCallback的实例(提供原始消息交换合同)时,才应用自动消息(请求和回复)转换。

ReplyToAddressCallback对于需要自定义逻辑在运行时根据接收到的消息和ReceiveAndReplyCallback的回复来确定replyTo地址的情况很有用。默认情况下,请求消息中的replyTo信息用于路由回复。

以下是基于POJO的接收和回复的示例…

public interface MessageListener {
void onMessage(Message message);
}

如果您的回调逻辑由于任何原因取决于AMQP Channel实例,您可以改为使用ChannelAwareMessageListener。它看起来相似,但有一个额外的参数:

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

您可以将适配器子类化并提供getListenerMethodName()的实现,以根据消息动态选择不同的方法。这个方法有两个参数,即原始的消息和extractMessage,后者是任何转换的结果。默认情况下,配置SimpleMessageConverter;有关可用的其他转换器的更多信息和信息,请参阅“SimpleMessageConverter”一节。

从版本1.4.2开始,原始消息具有consumerQueue和consumerTag属性,可用于确定从哪个队列接收消息。

从版本1.5开始,您可以将消费者队列/标记的映射配置为方法名称,以动态选择要调用的方法。如果map中没有条目,我们将回到默认监听器方法。

Container
现在您已经看到了Message-listening回调的各种选项,我们可以将注意力转向容器。基本上,容器处理“主动”的责任,使得监听器回调可以保持被动。容器是“生命周期”组件的示例。它提供了启动和停止的方法。配置容器时,您基本上弥合了AMQP队列和MessageListener实例之间的差距。您必须提供对ConnectionFactory以及该监听器应从其消费消息的队列名称或队列实例的引用。这是使用默认实现的最基本的例子SimpleMessageListenerContainer:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

或者,您可能更喜欢使用与上述实际代码片段非常相似的@Configuration样式:

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

为方便起见,命名空间提供了listener元素的priority属性:

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />

在这种情况下,队列和交换由containerAdmin声明,它具有auto-startup =“false”,因此在上下文初始化期间不会声明元素。同样,由于同样的原因,容器也没有启动。当容器稍后启动时,它使用它来引用containerAdmin来声明元素。

Batched Messages(批量消息)

批量消息由监听器容器(使用springBatchFormat消息头)自动分段。拒绝批次中的任何消息将导致整个批次被拒绝。有关批处理的更多信息,请参阅“批处理”一节。

Consumer Failure Events(消费失败事件)

从1.5版开始,SimpleMessageListenerContainer每当监听器(消费者)遇到某种故障时,都会发布应用程序事件。事件ListenerContainerConsumerFailedEvent具有以下属性:

这些事件可以通过实现 ApplicationListener<ListenerContainerConsumerFailedEvent> 来消耗。

当并发消费者大于1时,系统范围的事件(如连接失败)将由所有消费者发布。

如果一个消费者失败,因为一个如果它的队列被专门使用,默认情况下,以及发布事件,将发出一个WARN日志。要更改此日志记录行为,请在SimpleMessageListenerContainer的exclusiveConsumerExceptionLogger属性中提供自定义ConditionalExceptionLogger。另见“记录通道关闭事件”一节。

致命错误始终记录在ERROR级别;这不可修改。

Consumer Tags(消费者标签)

从版本1.4.5开始,您现在可以提供生成消费者标签的策略。默认情况下,消费者标签将由代理生成。

@Component
public class MyService {

@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}

}

上述示例的想法是,每当org.springframework.amqp.core.Queue“myQueue”上都有可用的消息时,将相应地调用processOrder方法(在这种情况下,与消息的有效内容相关)。

注解架构使用RabbitListenerContainerFactory为每个注解方法在幕后创建一个消息监听器容器。

在上面的例子中,myQueue必须已经存在并被绑定到一些交换。从版本1.5.0开始,只要应用程序上下文中存在RabbitAdmin,队列可以自动声明和绑定。

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "foo", value = "bar"),
@Argument(name = "baz")
})
)
public String handleWithHeadersExchange(String foo) {
...
}

请注意,x-message-ttl参数为队列设置为10秒。由于参数类型不是String,我们必须指定其类型;在这种情况下整数。与所有这样的声明一样,如果队列已经存在,那么这些参数必须与队列上的一致。对于headers exchange,我们设置绑定参数以匹配头foo设置为bar的消息,并且头baz必须与任何值一起显示。 x匹配参数意味着必须满足这两个条件。

参数名称,值,和类型可以是财产的占位符($ {…})或该表达式(# {…})。名称必须解析为字符串;类型表达式必须解析为类或类的完全限定名。价值必须解决的东西,可以被defaultconversionservice的类型(如在上面的例子中x-message-ttl)。

如果名称解析为null或空字符串,则忽略该参数。

Meta-Annotations(元注解)
有时您可能希望为多个监听器使用相同的配置。为了减少样板设置,您可以使用元注解来创建自己的监听器注解:

@Configuration
@EnableRabbit
public class AppConfig {

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
return factory;
}
}

默认情况下,基础组件将查找名为rabbitListenerContainerFactory的bean作为工厂用于创建消息监听器容器的源。在这种情况下,忽略RabbitMQ基础架构设置,可以使用3个线程的核心轮询大小和10个线程的最大池大小来调用processOrder方法。

可以自定义监听器容器工厂以使用每个注解,或者可以通过实现RabbitListenerConfigurer接口来配置显式默认值。仅当至少有一个端点没有特定的容器工厂注册时,才需要默认值。有关详细信息和示例,请参阅javadoc。

如果您喜欢XML配置,请使用 <rabbit:annotation-driven> 元素。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
...
factory.setMessageConverter(new Jackson2JsonMessageConverter());
...
return factory;
}

这配置了一个Jackson2转换器,期望头信息存在,以引导转换。

您还可以考虑一个ContentTypeDelegatingMessageConverter,它可以处理不同内容类型的转换。

在大多数情况下,除非要使用自定义ConversionService,否则不需要自定义方法参数转换器。

在1.6之前的版本中,转换JSON的类型信息必须在消息头中提供,或需要自定义ClassMapper。从版本1.6开始,如果没有类型信息头,则可以从目标方法参数推断类型。

此类型推断仅适用于方法级别的@RabbitListener。

See the section called “Jackson2JsonMessageConverter” for more information.

如果你想自定义方法参数转换器,你可以这样做,如下所示:

@Configuration
@EnableRabbit
public class AppConfig implements RabbitListenerConfigurer {

@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setQueueNames("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}

在上面的例子中,我们使用了SimpleRabbitListenerEndpoint,它提供了实际的MessageListener来调用,但是你也可以构建自己的端点变体来描述自定义调用机制。

应该注意的是,您也可以跳过使用@RabbitListener,只通过RabbitListenerConfigurer以编程方式注册您的端点。

Annotated Endpoint Method Signature
到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。我们重写它以使用自定义标题注入Order:

@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}

方法参数的处理由DefaultMessageHandlerMethodFactory提供,可以进一步自定义以支持其他方法参数。转换和验证支持也可以在这里定制。

例如,如果我们想在处理之前确保我们的订单有效,我们可以使用@Valid对有效负载进行注释,并配置必要的验证器,如下所示:

@Component
public class MyService {

@RabbitListener(queues = { "queue1", "queue2" } )
public void processOrder(String data, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
...
}

}

从版本1.5开始,您可以使用属性占位符和SpEL来对队列名称进行外部化:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}

如果您需要以传输独立的方式设置其他标头,则可以返回一条消息,如下所示:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}

表达式必须求值为一个String,它可以是一个简单的队列名称(发送到默认exchange),也可以是如上所述的exchange / routingKey形式。

#{…} 表达式在初始化时执行一次

对于动态回复路由,消息发送方应包含一个reply_to消息属性或使用下面描述的备用运行时Spel表达式。
从版本1.6开始,@SendTo可以是在运行时针对请求和回复进行评估的Spel表达式:

@RabbitListener(id="multi", queues = "someQueue")
public class MultiListenerBean {

@RabbitHandler
@SendTo("my.reply.queue")
public String bar(Bar bar) {
...
}

@RabbitHandler
public String baz(Baz baz) {
...
}

@RabbitHandler
public String qux(@Header("amqp_receivedRoutingKey") String rk, @Payload Qux qux) {
...
}

}

在这种情况下,如果转换的有效载荷是Bar,Baz或Qux,则会调用单独的@RabbitHandler方法。重要的是要了解系统必须能够基于有效载荷类型识别唯一的方法。检查类型是否具有不具有注释的单个参数的可分配性,或者使用@Payload注释进行注释。请注意,相同的方法签名适用于上述方法级@RabbitListener中讨论的方法。

请注意,必须在每个方法(如果需要)上指定@SendTo;它在类级别不支持。

@Repeatable @RabbitListener
从版本1.6开始,@RabbitListener注释被标记为@Repeatable。这意味着注释可以多次出现在相同的注释元素(方法或类)上。在这种情况下,为每个注释创建一个单独的监听器容器,每个注释都调用相同的监听器@Bean。可重复注释可与Java 8或更高版本一起使用;当使用Java 7或更早版本时,通过使用@RabbitListeners“容器”注释可以获得与@RabbitListener注释数组相同的效果。

Proxy @RabbitListener and Generics
如果您的服务旨在被代理(例如,在@Transactional的情况下),当接口具有通用参数时,有一些注意事项。具有通用界面和特定实现,例如:

static class TxServiceImpl implements TxService<Foo> {

@Override
@Transactional
@RabbitListener(...)
public String handle(@Payload Foo foo, @Header("amqp_receivedRoutingKey") String rk) {
...
}

}

Container Management
为注释创建的容器未在应用程序上下文中注册。您可以通过调用 RabbitListenerEndpointRegistry bean上的 getListenerContainers() 来获取所有容器的集合。然后,您可以遍历此集合,例如,停止/启动所有容器或调用注册表本身的Lifecycle方法,这将调用每个容器上的操作。

您还可以使用其id来获取对单个容器的引用,使用 getListenerContainer(String id) ;例如上面的代码段创建的容器的 registry.getListenerContainer(“multi”)

从1.5.2版开始,您可以使用 getListenerContainerIds() 获取注册容器的id。

从1.5版开始,您现在可以将组分配给RabbitListener端点上的容器。这提供了一种获取对容器子集的引用的机制;添加一个组属性会使一个类型为 Collection<MessageListenerContainer> 的bean注册到具有组名称的上下文中。

Threading and Asynchronous Consumers(线程和异步消费者)

异步消费者涉及到多线程。

在SimpleMessageListener中配置的TaskExecutor的线程用于在RabbitMQ Client发送新消息时调用MessageListener。如果未配置,则使用SimpleAsyncTaskExecutor。如果使用了线程池,请确保池大小足以处理配置的并发。

当使用默认的SimpleAsyncTaskExecutor时,对于调用监听器的线程,监听器容器beanName用作threadNamePrefix。这对日志分析很有用;通常建议在日志追踪器配置中始终包含线程名称。当TaskExecutor通过SimpleMessageListenerContainer上的taskExecutor属性特别提供时,它将按原样使用,无需修改。建议您使用类似的技术来命名由自定义TaskExecutor bean定义创建的线程,以帮助日志消息中的线程标识。

在CachingConnectionFactory中配置的执行程序在创建连接时被传递到RabbitMQ客户端,其线程用于将新消息传递给监听器容器。在撰写本文时,如果未配置,客户端将使用池大小为5的内部线程池执行程序。

RabbitMQ客户端使用ThreadFactory创建用于低级I / O(套接字)操作的线程。要修改此工厂,您需要配置底层RabbitMQ ConnectionFactory,如“Configuring the Underlying Client Connection Factory(配置底层客户端连接工厂)”一节中所述。

Detecting Idle Asynchronous Consumers(检测空闲异步消费者)

虽然效率高, 但是异步消费者的一个问题是检测他们什么时候空闲 - 如果没有消息到达一段时间,用户可能需要采取一些行动。

从版本1.6开始,现在可以将监听器容器配置为发布ListenerContainerIdleEvent,当有一段时间没有消息传递。当容器空闲时,每个idleEventInterval毫秒将发布一个事件。

要配置此功能,请在容器上设置idleEventInterval:
xml

@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}

@RabbitListener

public class Listener {

@RabbitListener(id="foo", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}

@EventListener(condition = "event.listenerId == 'foo'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}

}

注意
事件听众将看到所有容器的事件;因此,在上面的示例中,我们缩小了基于监听器ID接收的事件。
警告
如果要使用空闲事件来停止lister容器,则不应在调用监听器的线程上调用container.stop(),这将导致延迟和不必要的日志消息。相反,您应该将事件切换到另一个线程,然后可以停止容器。

3.1.7 Message Converters(消息转换器)

介绍

AmqpTemplate还定义了几种用于发送和接收将委托给MessageConverter的消息的方法。 MessageConverter本身很简单。它为每个方向提供单一方法:一种用于转换为消息,另一种用于从消息转换。请注意,转换为消息时,除了对象之外,还可以提供属性。 “object”参数通常对应于消息体。

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
throws AmqpException;

void convertAndSend(String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,只有两种方法:一种接受队列名称,一种依赖于模板的“队列”属性已被设置。

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<property name="connectionFactory" ref="rabbitConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
<!-- if necessary, override the DefaultClassMapper -->
<property name="classMapper" ref="customClassMapper"/>
</bean>
</property>
</bean>

如上所示,Jackson2JsonMessageConverter默认使用DefaultClassMapper。类型信息被添加到MessageProperties(并从中检索)。如果入站邮件在MessageProperties中不包含类型信息,但您知道预期的类型,则可以使用defaultType属性配置静态类型

@RabbitListener
public void foo(Foo foo) {...}

@RabbitListener
public void foo(@Payload Foo foo, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void foo(Foo foo, o.s.amqp.core.Message message) {...}

@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void foo(Foo foo, String bar) {...}

@RabbitListener
public void foo(Foo foo, o.s.messaging.Message<?> message) {...}

在前四个情况下,转换器将尝试转换为Foo类型。第五个例子是无效的,因为我们无法确定哪个参数应该接收消息有效载荷。在第六个例子中,由于通用类型是通配符,Jackson的默认值将会被应用。

但是,您可以创建一个自定义转换器,并使用targetMethod消息属性来决定将JSON转换为哪种类型。

注意

只有在方法级别声明@RabbitListener注释时,才能实现此类型推断。使用类级别的@RabbitListener,转换后的类型用于选择要调用哪个@RabbitHandler方法。因此,基础架构提供了可以由自定义转换器用于确定类型的targetObject消息属性。

MarshallingMessageConverter

另一个选择是MarshallingMessageConverter。它委托Spring OXM库的Marshaller和Unmarshaller策略接口的实现。在配置方面,最常见的是提供构造函数参数,因为Marshaller的大多数实现也将实现Unmarshaller。

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
<property name="delegates">
<map>
<entry key="application/json" value-ref="jsonMessageConverter" />
<entry key="application/xml" value-ref="xmlMessageConverter" />
</map>
</property>
</bean>
Java Deserialization

注意

从不受信任的源反序列化java对象时,可能存在一个漏洞。如果您使用 content-type application/x-java-serialized-object 接受来自不受信任来源的邮件,则应考虑配置哪些包/类被允许反序列化。这适用于 SimpleMessageConverter SerializerMessageConverter ,当它被配置为使用 DefaultDeserializer - 隐式或通过配置。默认情况下,白名单为空,表示所有类将被反序列化。您可以设置模式列表,如 foo.* foo.bar.Baz *.MySafeClass 。将按顺序检查模式,直到找到匹配项。如果没有匹配,则抛出 SecurityException 。使用这些转换器上的 whiteListPatterns 属性设置模式。

Message Properties Converters

MessagePropertiesConverter策略接口用于在Rabbit Client BasicProperties和Spring AMQP MessageProperties之间进行转换。默认实现(DefaultMessagePropertiesConverter)通常足以满足大多数用途,但如果需要,您可以实现自己的。当大小不大于1024字节时,默认属性转换器将将LongString类型的BasicProperties元素转换为String。较大的LongString不会转换(见下文)。可以使用构造函数参数覆盖此限制。

从版本1.6开始,长度超过长字符串限制的标题(默认1024)现在默认由DefaultMessagePropertiesConverter保留为LongString。您可以通过 getBytes[] toString() getStream() 方法访问内容。

以前,DefaultMessagePropertiesConverter将这样的头部转换为DataInputStream(实际上它只是引用了LongString的DataInputStream)。在输出时,该标头未被转换(除了一个字符串,例如,通过调用流上的toString(),java.io.DataInputStream@1d057a39)。

大量传入的LongString headers现在在输出上也被正确地“转换”(默认情况下)。

提供了一个新的构造函数,允许您将转换器配置为如前所述:

<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>

虽然容器和模板共享连接工厂,但它们不共享通道,因此请求和回复不会在同一事务中执行(如果是事务性的)。

在版本1.5.0之前, reply-address 属性不可用,回复始终使用默认exchage和reply-queue名称作为routing key。这仍然是默认值,但您现在可以指定新的reply-address属性。reply-address可以包含一个形式为 <exchange> / <routingKey> 的地址,并且回复将被路由到指定的交换机并路由到与routing key绑定的队列。reply-address优先于reply-queue。必须将 <reply-listener> 配置为单独的 <listener-container> 组件,当只有reply-address正在使用时,无论如何,reply-address和reply-queue(或 <listener-container> 上的队列属性)必须引用在逻辑上相同的队列。

在这个配置中,我们使用SimpleListenerContainer收到回复;当使用 <rabbit:template /> 定义模板时,如上所示,解析器将模板中的 container和wires定义为监听器。

当模板不使用固定的replyQueue(或正在使用Direct reply-to - 请参阅“RabbitMQ Direct reply-to”一节)时,不需要监听器容器。直接回复是使用RabbitMQ 3.4.0或更高版本的首选机制。

如果您将RabbitTemplate定义为 <bean /> ,或者使用 @Configuration 类将其定义为 @Bean ,或者以编程方式创建模板,则需要自己定义并连接回复监听器容器。如果您无法执行此操作,模板将永远不会收到回复,并将最终超时并返回null作为对 sendAndReceive 方法的调用的回复。

从1.5版开始,RabbitTemplate将检测它是否被配置为MessageListener以接收回复。如果没有,尝试发送和接收具有回复地址的消息将失败,并显示IllegalStateException(因为不会收到回复)。

此外,如果使用一个简单的replyAddress(队列名称),那么回复监听器容器将验证它正在监听具有相同名称的队列。如果回复地址是exchange和routing key,并且调试日志消息将被写入,则无法执行此检查。

当您自己接收回复监听器和模板时,确保模板的replyQueue和容器的队列(或queueNames)属性引用相同的队列是非常重要的。模板将回复队列插入到出站消息replyTo属性中。

以下是如何手动连接bean的示例。

@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyQueue(replyQueue());
rabbitTemplate.setReplyTimeout(60000);
return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}

@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}

测试用例 显示了一个连接了固定应答队列的RabbitTemplate的完整示例,以及处理请求并返回答复的“远程”监听器容器。

当回复超时(replyTimeout)时,sendAndReceive()方法返回null。

在版本1.3.6之前,只是记录了超时消息的迟到回复。现在,如果收到迟到的回复,它将被拒绝(模板抛出一个 AmqpRejectAndDontRequeueException )。如果回复队列被配置为将拒绝的消息发送到dead letter exchange,则可以检索回复以供稍后分析。只需使用等于回复queue名称的routing key将队列绑定到配置的dead letter exchange中。

有关配置dead lettering的更多信息,请参阅 RabbitMQ Dead Letter Documentation 。您还可以查看一个示例的 FixedReplyQueueDeadLetterTests 测试用例。

AsyncRabbitTemplate

版本1.6引入了 AsyncRabbitTemplate 。这与 AmqpTemplate 中的 sendAndReceive (和 convertSendAndReceive )类似,但是它们返回一个 ListenableFuture

您可以稍后通过调用 get() 来同步检索结果,也可以注册一个将结果异步调用的回调。

<bean id="client"
class="org.springframework.amqp.remoting.client.AmqpProxyFactoryBean">
<property name="amqpTemplate" ref="template" />
<property name="serviceInterface" value="foo.ServiceInterface" />
</bean>

<rabbit:connection-factory id="connectionFactory" />

<rabbit:template id="template" connection-factory="connectionFactory" reply-timeout="2000"
routing-key="remoting.binding" exchange="remoting.exchange" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="remoting.queue" />

<rabbit:direct-exchange name="remoting.exchange">
<rabbit:bindings>
<rabbit:binding queue="remoting.queue" key="remoting.binding" />
</rabbit:bindings>
</rabbit:direct-exchange>

public interface AmqpAdmin {

// Exchange Operations

void declareExchange(Exchange exchange);

void deleteExchange(String exchangeName);

// Queue Operations

Queue declareQueue();

String declareQueue(Queue queue);

void deleteQueue(String queueName);

void deleteQueue(String queueName, boolean unused, boolean empty);

void purgeQueue(String queueName, boolean noWait);

// Binding Operations

void declareBinding(Binding binding);

void removeBinding(Binding binding);

Properties getQueueProperties(String queueName);

}

getQueueProperties()方法返回有关队列的一些有限信息(消息计数和消费者计数)。返回的属性的键可用作RabbitTemplate(QUEUE_NAME,QUEUE_MESSAGE_COUNT,QUEUE_CONSUMER_COUNT)中的常量。 RabbitMQ REST API在QueueInfo对象中提供了更多的信息。

no-arg declareQueue()方法定义了一个自动生成名称的代理上的队列。此自动生成队列的附加属性为exclusive = true,autoDelete = true和durable = false。

declareQueue(Queue queue)方法接受Queue对象并返回声明队列的名称。如果提供的Queue的name属性是空字符串,则代理使用生成的名称声明队列,并将该名称返回给调用者。队列对象本身没有改变。此功能只能通过直接调用RabbitAdmin以编程方式使用。管理员自动声明不支持在应用程序上下文中声明性地定义队列。

这与AnonymousQueue形成对照,其中框架生成一个唯一的(UUID)名称,并将持久性设置为false和exclusive,autoDelete为true。具有空或缺少的name属性的 <rabbit:queue /> 将始终创建AnonymousQueue。

请参阅“AnonymousQueue”部分了解为什么AnonymousQueue优先于代理生成的队列名称,以及如何控制名称的格式。声明队列必须具有固定名称,因为它们可能在上下文中的其他位置被引用,例如在监听器中:

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

当CachingConnectionFactory缓存模式是CHANNEL(默认值)时,RabbitAdmin实现会在同一个ApplicationContext中声明的Queue,Exchanges和Bindings自动延迟声明。这些组件将被声明为s0on,因为连接已打开到代理。有一些命名空间功能使得这非常方便,例如:

<rabbit:queue name="stocks.trade.queue"/>

您可以同时提供id和name属性。这允许您通过独立于队列名称的id来引用队列(例如在绑定中)。它还允许标准的Spring功能,如属性占位符和队列名称的SpEL表达式;当使用名称作为bean标识符时,这些功能不可用。

可以使用其他参数配置队列,例如x-message-ttl或x-ha-policy。使用命名空间支持,使用 <rabbit:queue-arguments> 元素以参数名称/参数值对映射的形式提供它们。

<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>

当提供混合类型的参数时,为每个条目元素提供类型:

<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>

RabbitMQ代理将不允许声明具有不匹配参数的队列。例如,如果 queue 已经存在,没有 time to live 参数,并且尝试使用 key =“x-message-ttl”value =“100” 进行声明,则会抛出异常。

默认情况下,当发生任何异常时,RabbitAdmin将立即停止处理所有声明;这可能会导致下游问题,例如监听器容器无法初始化,因为未声明另一个队列(在错误之后定义)。

可以通过在RabbitAdmin上将ignore-declaration-exceptions属性设置为true来修改此行为。此选项指示RabbitAdmin记录异常,并继续声明其他元素。当使用java配置RabbitAdmin时,此属性为ignoreDeclarationExceptions。这是一个适用于所有元素,队列,交换和绑定的全局设置,具有仅适用于这些元素的类似属性。

在版本1.6之前,此属性仅在通道上发生IOException(例如当前和所需属性不匹配时)才会生效。现在,此属性对任何异常生效,包括TimeoutException等。

另外,任何声明异常将导致发布DeclarationExceptionEvent,它是可以由上下文中任何ApplicationListener使用的ApplicationEvent。该事件包含对管理员的引用,被声明的元素和Throwable。

从版本1.3开始,HeadersExchange可以配置为匹配多个标头;您还可以指定任何或所有标题必须匹配:

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}

@Bean
public MessageConverter jsonMessageConverter() {
return new JsonMessageConverter();
}

@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}

// additional code omitted for brevity

}

在库存应用程序中,服务器使用以下@Configuration类进行配置:

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;

@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}

/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}

// additional code omitted for brevity

}

客户端通过AmqpAdmin上的declareQueue()方法声明另一个队列,并通过在外部属性文件中的routing pattern 将该队列绑定到market data exchange。

Builder API for Queues and Exchanges(构建Queues和Exchanges的API)

版本1.6引入了方便的API,用于在使用Java配置时配置queue和Exchange对象:

@Configuration
public static class Config {

@Bean
public ConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}

@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}

@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}

@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}

@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

@Bean
public List<Exchange> es() {
return Arrays.<Exchange>asList(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true)
);
}

@Bean
public List<Queue> qs() {
return Arrays.asList(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true)
);
}

@Bean
public List<Binding> bs() {
return Arrays.asList(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null)
);
}

@Bean
public List<Declarable> ds() {
return Arrays.<Declarable>asList(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null)
);
}

}
Conditional Declaration(条件声明)

默认情况下,所有队列,交换和绑定都由应用程序上下文中的所有RabbitAdmin实例(具有auto-startup =“true”)声明。

从1.2版本开始,可以有条件地声明这些元素。当应用程序连接到多个代理程序并且需要指定哪个代理程序应该声明特定元素时,这是特别有用的。

代表这些元素的类实现Declarable,它有两个方法:shouldDeclare()和getDeclaringAdmins()。 RabbitAdmin使用这些方法来确定特定实例是否应该实际处理其连接上的声明。

这些属性作为命名空间中的属性,如以下示例所示。

@Bean
public RabbitAdmin admin() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf1());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}

@Bean
public RabbitAdmin admin2() {
RabbitAdmin rabbitAdmin = new RabbitAdmin(cf2());
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}

@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin());
return queue;
}

@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin());
return exchange;
}

@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin());
return binding;
}

AnonymousQueue

通常,当需要唯一命名的,排他性的自动删除队列时,建议使用AnonymousQueue而不是代理定义的队列名称(使用“”作为队列名称将导致broker生成队列名称) 。

原因如下:

  1. 在建立到代理的连接时,队列实际上是被声明的;这是在bean被创建并连接在一起之后的很长时间;使用队列的bean需要知道它的名字。事实上,当应用程序启动时,代理甚至可能无法运行。
  2. 如果由于某种原因与代理的连接丢失,则管理员将重新声明具有相同名称的AnonymousQueue。如果我们使用代理声明的队列,队列名称将会更改。

从1.5.3版开始,您可以控制AnonymousQueue使用的队列名称的格式。

默认情况下,队列名称是UUID的String表示形式;例如:07afcfe9-fe77-4983-8645-0061ec61a47a。

您现在可以在构造函数参数中提供AnonymousQueue.NamingStrategy实现:

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="springNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="springNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>

第一个创建具有UUID的String表示形式的名称。第二个创建名称如spring.gen-MRBv9sqISkuCiPfOYfpo4g。第三个创建名称,如custom.gen-MRBv9sqISkuCiPfOYfpo4g

当然,您可以提供自己的命名策略bean。

3.1.11 Delayed Message Exchange(延迟消息Exchange)

1.6版引入了对延迟消息exchange插件的支持

该插件目前被标记为实验性,但已有一年以上(在撰写本文时)。如果插件的更改需要,我们将尽快添加对这些更改的支持。因此,Spring AMQP中的这种支持也应该被认为是实验性的。该功能使用RabbitMQ 3.6.0和版本0.0.1的插件进行了测试。

要使用RabbitAdmin将延迟声明为exchange,只需将exchange bean上的delayed属性设置为true即可。 RabbitAdmin将使用交换类型(Direct,Fanout等)设置x-delayed-type参数,并使用x-delayed-message类型声明交换。

使用XML配置交换bean时,delayed(默认为false)也可用。

MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());

或者

public interface AmqpManagementOperations {

void addExchange(Exchange exchange);

void addExchange(String vhost, Exchange exchange);

void purgeQueue(Queue queue);

void purgeQueue(String vhost, Queue queue);

void deleteQueue(Queue queue);

void deleteQueue(String vhost, Queue queue);

Queue getQueue(String name);

Queue getQueue(String vhost, String name);

List<Queue> getQueues();

List<Queue> getQueues(String vhost);

void addQueue(Queue queue);

void addQueue(String vhost, Queue queue);

void deleteExchange(Exchange exchange);

void deleteExchange(String vhost, Exchange exchange);

Exchange getExchange(String name);

Exchange getExchange(String vhost, String name);

List<Exchange> getExchanges();

List<Exchange> getExchanges(String vhost);

List<Binding> getBindings();

List<Binding> getBindings(String vhost);

List<Binding> getBindingsForExchange(String vhost, String exchange);

}

有关详细信息,请参阅javadocs。

3.1.13 Exception Handling(异常处理器)

使用RabbitMQ Java客户端的许多操作可以抛出已检查的异常。例如,可能会抛出IOExceptions的情况很多。 RabbitTemplate,SimpleMessageListenerContainer和其他Spring AMQP组件将捕获这些异常并将其转换为运行时层次结构中的一个异常。这些在org.springframework.amqp包中定义,AmqpException是层次结构的基础。

当一个监听器抛出一个异常时,它被包装在一个ListenerExecutionFailedException中,通常这个消息被代理拒绝和重新排序。将defaultRequeueRejected设置为false将导致消息被丢弃(或路由到dead letter exchange)。如在“消息监听器和异步事件”一节中所讨论的,监听器可以抛出一个AmqpRejectAndDontRequeueException来有条件地控制这种行为。

但是,有一类错误,监听器无法控制行为。当遇到无法转换的消息(例如无效的content_encoding标头)时,会在消息达到用户代码之前抛出一些异常。将defaultRequeueRejected设置为true(默认),这些消息将被重新传递。在版本1.3.2之前,用户需要编写一个自定义的ErrorHandler,如3.1.13节“异常处理”所述,以避免这种情况。

从版本1.3.2开始,默认的ErrorHandler现在是一个ConditionalRejectingErrorHandler,它将拒绝(而不是重新排序)消息,并发生不可恢复的错误:

  • o.s.amqp…MessageConversionException
  • o.s.messaging…MessageConversionException
  • o.s.messaging…MethodArgumentNotValidException
  • o.s.messaging…MethodArgumentTypeMismatchException
  • java.lang.NoSuchMethodException
  • java.lang.ClassCastException

使用MessageConverter转换传入的消息有效负载时,可以抛出第一个。如果在映射到@RabbitListener方法时需要额外的转换,则转换服务可能会抛出第二个。如果在监听器中使用验证(例如@Valid),并且验证失败,则可能会抛出第三个。如果入站邮件转换为目标方法不正确的类型,则可能会抛出第四个邮件。例如,该参数被声明 为Message<Foo> ,但接收到 Message<Bar>

版本1.6.3中添加了第五和第六。

可以使用FatalExceptionStrategy配置此错误处理程序的实例,以便用户可以提供自己的条件消息拒绝规则,例如。来自Spring Retry(称为“消息监听器和异步情况”的部分)的BinaryExceptionClassifier的委托实现。另外,ListenerExecutionFailedException现在有一个failMessage属性可以在决定中使用。如果FatalExceptionStrategy.isFatal()方法返回true,则错误处理程序将抛出一个AmqpRejectAndDontRequeueException异常。当异常确定为致命时,默认的FatalExceptionStrategy会记录一条警告消息。

自1.6.3版本以来,将用户异常添加到致命列表中的方便方法是将ConditionalRejectingErrorHandler.DefaultExceptionStrategy子类化,并覆盖方法isUserCauseFatal(Throwable cause)为致命异常返回true。

3.1.14 Transactions(事务)

介绍

Spring Rabbit框架支持在同步和异步使用情况下进行自动事务管理,具有多种不同的语义,可以声明式选择,正如Spring事务的现有用户所熟悉的那样。这使得许多如果不是最常见的消息传递模式非常容易实现。

有两种方式将所需的事务语义信号发送到框架。在RabbitTemplate和SimpleMessageListenerContainer中都有一个标志channelTransacted,如果为true,则告知框架使用事务通道,并根据结果结束提交或回滚以结束所有操作(发送或接收),并发出异常指示回滚。另一个信号是提供一个外部事务与Spring的PlatformTransactionManager实现之一作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有一个事务正在进行,并且channelTransacted标志为真,那么消息传递事务的提交或回滚将被推迟到当前事务结束。如果channelTransacted标志为false,则没有事务语义适用于消息传递操作(它是自动检测的)。

channelTransacted标志是一个配置时间设置:当AMQP组件被创建时,它通常在应用程序启动时被声明和处理一次。外部事务原则上更动态,因为系统在运行时响应当前的Thread状态,但实际上当事务按声明方式分层到应用程序时通常也是一个配置设置。

对于使用RabbitTemplate的同步用例,外部事务由调用者提供,无论是声明性还是根据味道(通常的Spring事务模型)强制执行。声明性方法(通常是首选,因为它是非侵入性的)的示例,其中模板已配置为channelTransacted = true:

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

}

在上面的例子中,事务管理器被添加为从另一个bean定义(未显示)注入的依赖关系,并且channelTransacted标志也被设置为true。效果是,如果监听器失败并发生异常,则事务将被回滚,并且该消息也将返回给代理。重要的是,如果事务无法提交(例如数据库约束错误或连接性问题),则AMQP事务也将被回滚,并且该消息将被返回给代理。这有时被称为最佳努力1阶段提交,并且是可靠消息传递的非常强大的模式。如果在上面的示例中将channelTransacted标志设置为false,这是默认值,则仍将为监听器提供外部事务,但所有消息传递操作都将自动检测,因此其效果是即使提交消息传递操作在业务运行的回滚。

Conditional Rollback

在版本1.6.6之前,当回收规则添加到容器的transactionAttribute中时,使用外部事务管理器(例如JDBC)不起作用;异常总是回滚事务。

此外,当在容器的建议链中使用 事务建议 时,条件回滚并不是非常有用,因为所有监听器异常都被包装在ListenerExecutionFailedException中。

第一个问题已得到纠正,规则现在得到适当应用。此外,现在提供了ListenerFailedRuleBasedTransactionAttribute;它是RuleBasedTransactionAttribute的一个子类,唯一的区别是它知道ListenerExecutionFailedException,并且使用规则的这种异常的原因。此事务属性可以直接在容器中使用,也可以通过事务建议使用。

使用此规则的示例如下:

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}

如果您喜欢使用XML配置,请在XML应用程序上下文文件中声明以下bean:

<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>

3.1.15 Message Listener Container Configuration(Message Listener容器配置)

有很多的选项配置SimpleMessageListenerContainer相关事务和服务,其中一些可以相互作用。

下表显示了使用命名空间配置 <rabbit:listener-container /> 时的容器属性名称及其等效属性名称(括号中)。

PS:其他内容还在继续翻译当中

1.前言

Spring AMQP项目是用于开发AMQP的解决方案。 我们提供一个“模板”作为发送和接收消息的抽象。我们还为普通POJO

作者:liuxing's blog
青春须早为,岂能长少年!
原文地址:Spring AMQP中文文档, 感谢原作者分享。

发表评论