β

Spring JMS粗译

liuxing's blog 119 阅读

24.1介绍

Spring提供了JMS集成框架简化了JMS API的使用,像Spring JDBC API的使用。

JMS的功能大致上分为两块,叫做消息发送和消息监听。JmsTemplate 用于发送消息和同步消息监听。和Java EE的事件驱动Bean风格类似,对于异步接收消息,Spring提供了一些消息监听容器来创建消息驱动的POJO(MDP)。

org.springframework.jms.core 提供使用JMS的核心功能。 就象为JDBC提供的 JdbcTemplate一样,它提供了JMS模板类来处理资源的创建和释放以简化JMS的使用。Spring模板类的公共设计原则就是通过提供工具方法去执行公共的操作,并将实际的处理任务委派到用户实现的回调接口上,从而完成更复杂的操作。JMS模板也遵循这样的设计原则。这些类提供众多便利的方法来发送消息、同步接收消息、使用户可以接触到JMS session和消息产生者。

org.springframework.jms.support 提供 JMSException 的转换功能。它将受控的 JMSException 异常层次转换到一个对应的非受控异常层次。任何受控 javax.jms.JMSException 异常的子类都被包装在非受控 UncategorizedJmsException 异常里。

org.springframework.jms.support.converter 提供一个 MessageConverter 用来抽象Java对象和JMS消息之间的转换操作。

org.springframework.jms.support.destination 为管理JMS目的地提供多种策略,例如为存储在JNDI中的目的地提供一个服务定位器。

org.springframework.jms.annotation 提供了必要的基础设施 支持注解驱动的端点侦听器使用 @JmsListener

org.springframework.jms.config 提供的解析器实现 jms 命名空间以及java配置支持容器和配置监听器、创建端点侦听器。

org.springframework.jms.connection 提供一个适合在独立应用中使用的 ConnectionFactory 的实现。它还为JMS提供了一个Spring的 PlatformTransactionManager 的实现(现在叫做 JmsTransactionManager )。这样可以把JMS作为一个事务资源无缝地集成到Spring的事务管理机制中去。

24.2使用Spring JMS

24.2.1 JmsTemplate

使用 JmsTemplate 的代码只需要实现规范中定义的回调接口。 MessageCreator 回调接口通过 JmsTemplate 中调用代码提供的 Session 来创建一条消息。然而,为了允许更复杂的JMS API应用,回调接口 SessionCallback 为用户提供JMS session,并且回调接口 ProducerCallback Session MessageProducer 对显露给用户。

JMS API有两种发送方法,一种采用发送模式、优先级和存活时间作为服务质量(QOS)参数,另一种使用无需QOS参数的缺省值方法。由于在 JmsTemplate 中有许多种发送方法,QOS参数通过bean的属性方式进行设置,从而避免在多种发送方法中重复。同样,使用 setReceiveTimeout 属性值来设置同步接收调用的超时值。

某些JMS供应者允许通过ConnectionFactory的配置来设置缺省的QOS值。这样在调用 MessageProducer 的发送方法 send(Destination destination, Message message) 时会使用那些不同的QOS缺省值,而不是JMS规范中定义的值。所以,为了提供对QOS值的一致管理,JmsTemplate必须通过设置布尔值属性 isExplicitQosEnabled 为true,使它能够使用自己的QOS值。

JmsTemplate 类的实例 一经配置便是线程安全 的。 这很重要,因为这意味着你可以配置一个 JmsTemplate 的单例,然后把这个 共享的 引用安全的实例注入多个协作的对象中。 要清楚一点,JmsTemplate 是有状态的,因为它维护了 ConnectionFactory 的引用,但这个状态不属于会话状态。

Spring Framework 4.1提供了一个 JmsMessagingTemplate ,这个是对JmsTemplate的包装,主要用来发送最基本的消息内容,即 org.springframework.messaging.Message

24.2.2连接

JmsTemplate 需要一个对 ConnectionFactory 的引用。ConnectionFactory 是JMS规范的一部分,并且是使用JMS的入口。客户端应用通常用它作工厂配合JMS提供者去创建连接,并封装许多和供应商相关的配置参数,例如SSL的配置选项。

当在EJB里使用JMS时,供应商会提供JMS接口的实现,这样们可以参与声明式事务管理并提供连接池和会话池。 为了使用这个JMS实现,Java EE容器通常要求你在EJB或servlet部署描述符中声明一个JMS连接工厂做为 resource-ref。 为确保可以在EJB内使用 JmsTemplate 的这些特性,客户应用应当确保它引用了被管理的ConnectionFactory实现。

缓存消息传递资源
标准的API执行流程如下:
ConnectionFactory->Connection->Session->MessageProducer->send

从创建到发送有三个中间对象的创建和销毁,为了提高性能,Spring提供了 ConnectionFactory

SingleConnectionFactory
Spring提供了一个 ConnectionFactory 接口的实现,SingleConnectionFactory,它将在所有的 createConnection 调用中返回一个相同的 Connection,并忽略所有对 close的调用。这在测试和独立环境中相当有用,因为多个 JmsTemplate 调用可以使用同一个连接以跨越多个事务。SingleConnectionFactory 通常引用一个来自JNDI的标准 ConnectionFactory。

CachingConnectionFactory
CachingConnectionFactory类扩展自SingleConnectionFactory,主要用于提供缓存JMS资源功能。具体包括MessageProducer、MessageConsumer和Session的缓存功能。
默认情况下,CachingConnectionFactory只缓存一个session,在它的JavaDoc中,它声明对于低并发情况下这是足够的。可以使用SessionCacheSize进行配置。

24.2.3目的地管理

和连接工厂一样,目的地是可以在JNDI中存储和获取的JMS管理的对象。配置一个Spring应用上下文时,可以使用JNDI工厂类 JndiObjectFactoryBean 把对你对象的引用依赖注入到JMS目的地中。然而,如果在应用中有大量的目的地,或者JMS供应商提供了特有的高级目的地管理特性,这个策略常常显得很麻烦。创建动态目的地或支持目的地的命名空间层次就是这种高级目的地管理的例子。JmsTemplate 将目的地名称到JMS目的地对象的解析委派给 DestinationResolver 接口的一个实现。JndiDestinationResolver 是 JmsTemplate 使用的默认实现,并且提供动态目的地解析。同时 JndiDestinationResolver 作为JNDI中的目的地服务定位器,还可选择回退去使用 DynamicDestinationResolver 中的行为。

经常见到一个JMS应用中使用的目的地只有在运行时才知道,因此,当部署一个应用时,它不能用可管理的方式创建。这是经常发生的,因为在互相作用的系统组件间有些共享应用逻辑会在运行的时候按照共同的命名规范创建消息目的地。虽然动态创建目的地不是JMS规范的一部分,但是大多数供应商已经提供了这个功能。用户为动态创建的目的地定义和临时目的地不同的名字,并且通常不被注册到JNDI中。不同供应商创建动态消息目的地所使用的API差异很大,因为和目的地相关的属性是供应商特有的。然而,有时由供应商会作出一个简单的实现选择-忽略JMS规范中的警告,使用 TopicSession 的方法 createTopic(String topicName) 或者 QueueSession 的方法 createQueue(String queueName) 来创建一个带默认值属性的新目的地。依赖于供应商的实现,DynamicDestinationResolver 也可能创建一个物理上的目的地,而不再仅仅是一个解析。

布尔属性 pubSubDomain 用来配置 JmsTemplate 使用什么样的JMS域。这个属性的默认值是false,使用点到点的域,也就是队列。在1.0.2的实现中,这个属性值用来决定 JmsTemplate 将消息发送到一个 Queue 还是一个 Topic。这个标志在1.1的实现中对发送操作没有影响。然而,在这两个JMS版本中,这个属性决定了通过接口 DestinationResolver 的实现来决定如何解析动态消息目的地。

你还可以通过属性 defaultDestination 配置一个带有默认目的地的 JmsTemplate。不指明目的地的发送和接受操作将使用该默认目的地。

24.2.4消息监听容器

在EJB世界里,JMS消息最常用的功能之一是用于实现消息驱动Bean(MDB)。Spring提供了一个方法来创建消息驱动的POJO(MDP),并且不会把用户绑定在某个EJB容器上。

通常用消息监听器容器从JMS消息队列接收消息并驱动被注射进来的MDP。消息监听器容器负责消息接收的多线程处理并分发到各MDP中。一个消息侦听容器是MDP和消息提供者之间的一个中介,用来处理消息接收的注册,事务管理的参与,资源获取和释放,异常转换等等。这使得应用开发人员可以专注于开发和接收消息(可能的响应)相关的(复杂)业务逻辑,把和JMS基础框架有关的样板化的部分委托给框架处理。

SimpleMessageListenerContainer
这个消息侦听容器是最简单的。它在启动时创建固定数量的JMS session并在容器的整个生命周期中使用它们。这个类不能动态的适应运行时的要求或参与消息接收的事务处理。然而它对JMS提供者的要求也最低。它只需要简单的JMS API。

DefaultMessageListenerContainer
这个消息侦听器使用的最多。和 SimpleMessageListenerContainer 相反,这个子类可以动态适应运行时侯的要求,也可以参与事务管理。每个收到的消息都注册到一个XA事务中(如果使用 JtaTransactionManager 配置过),这样就可以利用XA事务语义的优势了。这个类在对JMS提供者的低要求和提供包括事务参于等的强大功能上取得了很好的平衡。

24.2.5事务管理

Spring提供了 JmsTransactionManager 为单个JMS ConnectionFactory 管理事务。这将允许JMS应用利用Spring的事务管理功能。JmsTransactionManager 绑定 ConnectionFactory 的一个Connection/Session对到线程上,来提供本地资源事务。JmsTemplate 自动检测到这些事务性资源从而对它们进行操作。

在Java EE环境中,SingleConnectionFactory将把Connection和Session放到缓冲池中,因此这些资源在事务中得到了有效的复用。在独立环境中使用Spring的 SingleConnectionFactory 会存在共享的JMS Connection,但每个事务有自己独立的 Session。另外可以考虑使用供应商特定的池适配器,,如ActiveMQ的 PooledConnectionFactory 类。

JmsTemplate 也可以和 JtaTransactionManager 以及具有XA能力的JMS ConnectionFactory一起使用来提供分布式事务。记住这需要使用JTA事务管理器或合适的可配置的XA ConnectionFactory!(参考你所使用的J2EE服务器/JMS供应商的文档。)

当使用JMS API从一个 Connection 中创建 Session 时,在受管理的和非受管理的事务环境下重用代码会可能会让人迷惑。这是因为JMS API只有一个工厂方法来创建 Session ,并且它需要用于事务和模式确认的值。在受管理的环境下,由事务结构环境负责设置这些值,这样在供应商包装的JMS连接中可以忽略这些值。当在一个非管理性的环境中使用 JmsTemplate 时,你可以通过使用属性 SessionTransacted 和 SessionAcknowledgeMode 来指定这些值。当 JmsTemplate 配合 PlatformTransactionManager 使用时,模板将一直被赋予一个事务性JMS的 Session。

24.3发送消息

JmsTemplate 包含许多方便的方法来发送消息。有些发送方法可以使用 javax.jms.Destination 对象指定目的地,也可以使用字符串在JNDI中查找目的地。没有目的地参数的发送方法使用默认的目的地。

public void sendWithConversion() {
Map map = new HashMap();
map.put("Name", "Mark");
map.put("Age", new Integer(47));
jmsTemplate.convertAndSend("testQueue", map, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws JMSException {
message.setIntProperty("AccountID", 1234);
message.setJMSCorrelationID("123-00001");
return message;
}
});
}

将产生以下消息格式

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}

}

一旦你实现了 MessageListener 后就可以创建一个消息侦听容器。
请看下面例子是如何定义和配置一个随Sping发行的消息侦听容器的(这个例子用 DefaultMessageListenerContainer)

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

void onMessage(Message message, Session session) throws JMSException;

}

如果你希望你的MDP可以响应所有接收到的消息(使用 onMessage(Message, Session) 方法提供的 Session)那么你可以选择让你的MDP实现这个接口(优先于标准的JMS MessageListener 接口)。所有随Spring发行的支持MDP的消息侦听容器都支持 MessageListener 或 SessionAwareMessageListener 接口的实现。要注意的是实现了 SessionAwareMessageListener 接口的类通过接口和Spring有了耦合。是否选择使用它完全取决于开发者或架构师。

请注意 SessionAwareMessageListener 接口的 ‘onMessage(..)’ 方法会抛出 JMSException异常。和标准JMS MessageListener 接口相反,当使用 SessionAwareMessageListener 接口时,客户端代码负责处理任何抛出的异常。

24.4.4 MessageListenerAdapter

MessageListenerAdapter 类是Spring的异步支持消息类中的不变类(final class):简而言之,它允许你几乎将 任意 一个类做为MDP显露出来(当然有某些限制)。

考虑如下接口定义。注意虽然这个接口既不是从 MessageListener 也不是从 SessionAwareMessageListener 继承来得,但通过 MessageListenerAdapter 类依然可以当作一个MDP来使用。同时也请注意各种消息处理方法是如何根据他们可以接收并处理消息的内容来进行强类型匹配的。

public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}

特别请注意,上面的 MessageDelegate 接口(上文中 DefaultMessageDelegate 类)的实现完全 不 依赖于JMS。它是一个真正的POJO,我们可以通过如下配置把它设置成MDP。

public interface TextMessageDelegate {

void receive(TextMessage message);

}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>

请注意,如果上面的 ‘messageListener’ 收到一个不是 TextMessage 类型的JMS Message,将会产生一个 IllegalStateException 异常(随之产生的其他异常只被捕获而不处理)。
MessageListenerAdapter 还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应 消息。
请看下面的接口及其实现:

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}

如果上面的 DefaultResponsiveTextMessageDelegate 和 MessageListenerAdapter 联合使用,那么任意从执行 ‘receive(..)’ 方法返回的非空值都将(缺省情况下)转换成一个 TextMessage。这个返回的 TextMessage 将被发送到原来的 Message 中JMS Reply-To属性定义的 目的地(如果存在),或者是 MessageListenerAdapter 设置(如果配置了)的缺省 目的地;如果没有定义 目的地,那么将产生一个 InvalidDestinationException 异常(此异常将不会只被捕获而不处理,它 将沿着调用堆栈上传)。

24.4.5事务中的消息处理

在消息监听器的调用中使用事务只需要重新配置监听器容器
通过监听器容器定义中的 sessionTransacted 标记可以轻松的激活本地资源事务。每次消息监听器的调用都在激活的JMS事务中执行,执行失败时,消息接收将发生回滚。这个本地事务还将包含响应信息的发送(通过 SessionAwareMessageListener),但其它资源的操作(例如访问数据库)是独立的。经常会发生类似于数据库处理已提交但消息处理提交失败的情况,因此需要在监听器的实现中进行重复消息的检测。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后你只需要把它添加到早先配置好的容器中。这个容器将处理剩下的事情。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
<property name="resourceAdapter" ref="resourceAdapter"/>
<property name="activationSpecConfig">
<bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
<property name="destinationName" value="myQueue"/>
</bean>
</property>
<property name="messageListener" ref="myMessageListener"/>
</bean>

另外,你可以设置一个 JmsMessageEndpointManager 指定 ActivationSpec 对象。 ActivationSpec 对象可以通过JNDI查找来完成 (使用 <jee:jndi-lookup> )。

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
<property name="resourceAdapter">
<bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="tcp://localhost:61616"/>
</bean>
</property>
<property name="workManager">
<bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
</property>
</bean>

请参考 JmsMessageEndpointManager、JmsActivationSpecConfig 和 ResourceAdapterFactoryBean 部分的JavaDoc,以获得更详细的信息。

Spring也提供了并不与JMS绑定的通用JCA消息端点管理器: org.springframework.jca.endpoint.GenericMessageEndpointManager。 它允许使用任何类型的消息监听器(例如CCI MessageListener)和任何提供者特定的ActivationSpec对象。从所涉及的JCA提供者的文档可以找到这个连接器的实际能力,从 GenericMessageEndpointManager 的JavaDoc中可以找到Spring特有的配置细节。

基于JCA的消息端点管理器与EJB 2.1的Message-Driven Beans很相似,它使用了相同的资源提供者约定。像EJB 2.1 MDB一样,任何被JCA提供者支持的消息监听器接口都可以在Spring Context中使用。尽管如此,Spring仍为JMS提供了显式的“方便的”支持,很显然是因为JMS是JCA端点管理约定中最通用的端点API。

24.6注解驱动的监听器

异步接受消息最简单的方法是使用 @JmsListener

@Configuration
@EnableJms
public class AppConfig {

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setConcurrency("3-10");
return factory;
}
}

使用DefaultJmsListenerContainerFactory作为消息监听容器,setConcurrency用于配置消费线程,这里是最小3个,最大10个。

XML的方式:
使用 <jms:annotation-driven>

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}

在上面的例子中,我们使用SimpleJmsListenerEndpoint提供要调用的实际消息监听者,但你也可以建立自己的端点变量描述自定义调用机制。

值得注意的是,你也可以完全跳过使用@JmsListener,只通过JmsListenerConfigurer以编程方式注册您的端点。

24.7JMS命名空间支持

Spring JMS引入了XML命名空间以简化JMS的配置。使用JMS命名空间元素时,需要引用如下的JMS Schema:

<jms:listener-container>

<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

上面的例子等同于在 24.4.4MessageListenerAdapter 的示例中,定义两个不同的监听器容器和两个不同的 MessageListenerAdapter 。除了上面的属性外,listener 元素还具有几个可选的属性。下面的表格列出了所有的属性:

表24.1 JMS listener 元素的属性

属性 描述
id 监听器容器的Bean名称。如果没有指定,将自动生成一个Bean名称。
destination (必须) 监听器目的地的名称,由 DestinationResolver 的策略决定。
ref (必须) 处理对象的Bean名称
method 处理器中被调用的方法名。如果 ref 指向 MessageListener 或者 Spring SessionAwareMessageListener,则这个属性可以被忽略。
response-destination 默认的响应目的地是发送响应消息抵达的目的地。 这用于请求消息没有包含”JMSReplyTo”域的情况。响应目的地类型被监听器容器的”destination-type”属性决定。记住:这仅仅适用于有返回值的监听器方法,因为每个结果对象都会被转化成响应消息。
subscription 持久订阅的名称,如果需要的话。
selector 监听器的一个可选的消息选择器。
concurrency 并发的数量,格式:5(最大),3-5(最小和最大)

<listener-container/> 元素也有几个可选的属性。 这些属性允许像 基本的 JMS设置和资源引用一样来定义不同的策略 (例如 taskExecutor 和 destinationResolver)。 使用这些属性,可以定义很广泛的定制监听器容器,同时仍享有命名空间的便利。

这样的设置可以自动公开一个 JmsListenerContainerFactory

<jms:jca-listener-container resource-adapter="myResourceAdapter"
destination-resolver="myDestinationResolver"
transaction-manager="myTransactionManager"
concurrency="10">

<jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

表24.3 JMS < jca-listener-container / >元素的属性

属性 描述
factory-id 公开此元素被定义为一个JmsListenerContainerFactory与指定id,以便它们可以与其他的端点被重新使用的设置。
resource-adapter JCA ResourceAdapter Bean 的一个引用(默认的Bean名称是’resourceAdapter’)
activation-spec-factory JmsActivationSpecFactory 的一个引用。 默认自动检测JMS提供者和它的 ActivationSpec 类 (参考 DefaultJmsActivationSpecFactory)
destination-resolver DestinationResolver 策略的引用,用以解析JMS Destinations。
message-converter MessageConverter 策略的引用,用以转换JMS Messages 成监听器方法参数。 默认值是 SimpleMessageConverter
destination-type 监听器的JMS目的地类型。可用的选项包含 queue、topic 或者 durableTopic 默认是 ‘queue’)。
client-id 这个监听器容器在JMS客户端的id。
acknowledge 本地JMS应答模式。可用的选项包含:auto、client、dups-ok 或者 transacted (默认值是 ‘auto’)。 ‘transacted’ 的值可激活本地事务性 Session。 也可以通过指定下面介绍的 transaction-manager 属性
transaction-manager Spring JtaTransactionManager 或者 javax.transaction.TransactionManager 的引用,用以为传进的消息应用XA事务。 如果没有指定,将使用本地应答模型(参见“acknowledge”属性)。
concurrency 每个监听器可激活的Session最大并发数。
prefetch 加载进每个Session的最大消息数。记住增加这个值会造成并发空闲。

24.1介绍

Spring提供了JMS集成框架简化了JMS API的使用,像Spring JDBC API的使用。

作者:liuxing's blog
青春须早为,岂能长少年!
原文地址:Spring JMS粗译, 感谢原作者分享。

发表评论