Spring结合JMS(一)——基于ActiveMQ达成转载
原创1.1 JMS 简介
JMS的全称是Java Message Service,即Java消息服务。它主要用于生产者和消费者之间的消息传递,生产者和消费者负责生成消息,消费者负责接收消息。如果我们将其应用于实际的业务需求,我们可以使用生产者在特定时间生成消息并发送它。相应的消费者将在收到相应的消息后完成相应的业务逻辑。消息传递有两种类型,一种是点对点,即生产者和消费者一一对应;/订阅模式,即生产者生成消息并发送消息后,可以由多个消费者接收。
1.2 Spring 整合JMS
对JMS在做了简单的介绍之后,接下来我们来谈谈它。Spring整合JMS具体流程。JMS这只是一个标准。当我们真正使用它时,我们需要有它的具体实现。在这里我们使用它Apache的activeMQ作为其实施。使用的依赖项利用率Maven根据以下内容进行管理:
Xml代码
- < dependencies >
- < dependency >
- < groupId > junit </ groupId >
- < artifactId > junit </ artifactId >
- < version > 4.10 </ version >
- < scope > test </ scope >
- </ dependency >
- < dependency >
- < groupId > org.springframework </ groupId >
- < artifactId > spring-context </ artifactId >
- < version > ${spring-version} </ version >
- </ dependency >
- < dependency >
- < groupId > org.springframework </ groupId >
- < artifactId > spring-jms </ artifactId >
- < version > ${spring-version} </ version >
- </ dependency >
- < dependency >
- < groupId > org.springframework </ groupId >
- < artifactId > spring-test </ artifactId >
- < version > ${spring-version} </ version >
- </ dependency >
- < dependency >
- < groupId > javax.annotation </ groupId >
- < artifactId > jsr250-api </ artifactId >
- < version > 1.0 </ version >
- </ dependency >
- < dependency >
- < groupId > org.apache.activemq </ groupId >
- < artifactId > activemq-core </ artifactId >
- < version > 5.7.0 </ version >
- </ dependency >
- </ dependencies >
1.2.1 activeMQ准备
由于使用了apache的activeMQ作为JMS实施,然后我们应该首先开始。apache在官方网站上下载activeMQ( http://activemq.apache.org/download.html ),执行解压缩并运行bin目录下方activemq.bat文件启动activeMQ。
1.2.2配置ConnectionFactory
ConnectionFactory用于生成JMS服务器的链接,Spring为我们提供多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS对服务器链接请求将始终返回相同的链接,并且将被忽略。Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它有SingleConnectionFactory在所有功能中,它还添加了一个缓存功能,可以缓存Session、MessageProducer和MessageConsumer。这里我们使用SingleConnectionFactory举个例子。
Xml代码
- < bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" />
这定义了生成JMS服务器链接ConnectionFactory是吗?答案是对的和错的。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory真的生产JMS服务器链接ConnectionFactory还得是由JMS服务供应商提供并需要将其注入Spring提供的ConnectionFactory在里面我们在这里使用ActiveMQ实现的JMS,所以我们真的可以生产Connection应该是ActiveMQ提供的ConnectionFactory。因此,定义ConnectionFactory完整的代码应如下所示:
Xml代码
-
< bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >
-
< property name = "brokerURL" value = "tcp://localhost:61616" />
-
</ bean >
-
< bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >
-
< property name = "targetConnectionFactory" ref = "targetConnectionFactory" />
-
</ bean >
ActiveMQ为我们提供PooledConnectionFactory,通过注入ActiveMQConnectionFactory可用于放置Connection、Session和MessageProducer池,这可以大大减少我们的资源消耗。使用时PooledConnectionFactory当我们定义ConnectionFactory应定义如下:
Xml代码
-
< bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >
-
< property name = "brokerURL" value = "tcp://localhost:61616" />
-
</ bean >
-
< bean id = "pooledConnectionFactory" class = "org.apache.activemq.pool.PooledConnectionFactory" >
-
< property name = "connectionFactory" ref = "targetConnectionFactory" />
-
< property name = "maxConnections" value = "10" />
-
</ bean >
-
< bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >
-
< property name = "targetConnectionFactory" ref = "pooledConnectionFactory" />
-
</ bean >
1.2.3配置生产者
配置好ConnectionFactory然后我们需要分配生产者。制作人负责生成和发送消息JMS服务器,它通常对应于我们的一个业务逻辑服务实现类。但是我们的服务实现类如何发送消息?这通常是Spring对于我们的JmsTemplate类来实现,因此配置生成器实际上是消息发送配置的核心。JmsTemplate。对于消息的发送者来说,在发送消息时应该知道将消息发送到哪里。为此,我们定义JmsTemplate当你需要注入一个Spring提供的ConnectionFactory对象。
Xml代码
- < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >
- < property name = "connectionFactory" ref = "connectionFactory" />
- </ bean >
在实际使用中JmsTemplate发送消息时,我们需要知道消息的目的地,即。destination。在Jms有一个用于表示目的地。Destination接口,它没有任何方法定义,只是用于进行标识。当我们使用JmsTemplate消息发送时未指定。destination默认值将在以下情况下使用Destination。默认Destination可以定义jmsTemplate bean对象。defaultDestination或defaultDestinationName为了注射,defaultDestinationName对应的是一个普通字符串。在里面ActiveMQ两种类型Destination,一种是对等ActiveMQQueue,另一个是支持订阅/释放模式。ActiveMQTopic。在定义这两种类型时Destination当我们都能通过name要构造的属性,例如:
Xml代码
- < bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >
- < constructor-arg >
- < value > queue </ value >
- </ constructor-arg >
- </ bean >
- < bean id = "topicDestination" class = "org.apache.activemq.command.ActiveMQTopic" >
- < constructor-arg value = "topic" />
- </ bean >
假设我们定义ProducerService,里面有一个方向Destination发送纯文本消息的方法。sendMessage,那么我们的代码如下所示:
Java代码
-
package com.tiantian.springintejms.service.impl;
-
import javax.annotation.Resource;
-
import javax.jms.Destination;
-
import javax.jms.JMSException;
-
import javax.jms.Message;
-
import javax.jms.Session;
-
import org.springframework.jms.core.JmsTemplate;
-
import org.springframework.jms.core.MessageCreator;
-
import org.springframework.stereotype.Component;
-
import com.tiantian.springintejms.service.ProducerService;
-
@Component
-
public class ProducerServiceImpl implements ProducerService {
-
private JmsTemplate jmsTemplate;
-
public void sendMessage(Destination destination, final String message) {
-
System.out.println( "---------------生产者发送消息-----------------" );
-
System.out.println( "---------------制片人发送了一条消息:" + message);
-
jmsTemplate.send(destination, new MessageCreator() {
-
public Message createMessage(Session session) throws JMSException {
-
return session.createTextMessage(message);
-
}
-
});
-
}
-
public JmsTemplate getJmsTemplate() {
-
returnjmsTemplate;
-
}
-
@Resource
-
public void setJmsTemplate(JmsTemplate jmsTemplate) {
-
this .jmsTemplate = jmsTemplate;
-
}
-
}
我们可以从中看到sendMessage在方法体中,我们传递jmsTemplate向相应的Destination属于此时,我们生成一条简单的文本消息并将其发送到指定的目的地。Destination生产者已配置。
1.2.4配置使用者
生产者到指定目的地Destination发送消息后,下一步是消费者将消息消费到指定的目的地。那么消费者如何知道生产者将消息发送到指定的目的地Destination它在哪里?这已经结束了Spring为我们封装的消息侦听容器MessageListenerContainer实施,负责接收信息并将其分发给实际MessageListener处理。每个消费者都需要为每个目的地指定相应的目的地。MessageListenerContainer。对于消息侦听容器,除了知道要侦听哪个目的地之外,它还需要知道在哪里侦听,也就是说,它还必须知道要侦听哪一个目的地。JMS服务器,通过配置MessageConnectionFactory当注入一个ConnectionFactory实现。所以我们 在配置 MessageListenerContainer 必须指定三个属性,一个是侦听位置。 ConnectionFactory ; 一种是表示倾听或其他 Destination ; 一种是在收到消息后进行处理。 MessageListener 。Spring共有两种类型MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。
SimpleMessageListenerContainer会话在开始时创建。session和消费者Consumer并将使用标准JMS MessageConsumer.setMessageListener()方法注册侦听器let。JMS提供者调用监听器的回调函数。它不能动态地适应运行时需求并参与外部事务管理。兼容性方面,它非常接近独立JMS规格,但通常不兼容Java EE的JMS限制。
大多数时候我们仍然使用DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer将动态适应运行时需求,并能够参与外部事务管理。它平衡了权利JMS低提供商要求,高级功能,如事务参与和兼容性Java EE环境。
定义消息的处理。 MessageListener
要定义消息的处理。MessageListener我们只需要实施JMS规范中的MessageListener界面很好。MessageListener接口中只有一个方法。onMessage方法,在收到消息时自动调用该方法。
Java代码
-
package com.tiantian.springintejms.listener;
-
import javax.jms.JMSException;
-
import javax.jms.Message;
-
import javax.jms.MessageListener;
-
import javax.jms.TextMessage;
-
public class ConsumerMessageListener implements MessageListener {
-
public void onMessage(Message message) {
-
//这里我们知道制作人正在发送一条纯文本消息,因此您可以在这里直接进行演员。
-
TextMessage textMsg = (TextMessage) message;
-
System.out.println( "收到一条纯文本消息。" );
-
try {
-
System.out.println( "消息是:" + textMsg.getText());
-
} catch (JMSException e) {
-
e.printStackTrace();
-
}
-
}
-
}
有了MessageListener之后,我们可以Spring在配置文件中配置了消息侦听容器。
Xml代码
-
< bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >
-
< constructor-arg >
-
< value > queue </ value >
-
</ constructor-arg >
-
</ bean >
-
< bean id = "consumerMessageListener" class = "com.tiantian.springintejms.listener.ConsumerMessageListener" />
-
< bean id = "jmsContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
-
< property name = "connectionFactory" ref = "connectionFactory" />
-
< property name = "destination" ref = "queueDestination" />
-
< property name = "messageListener" ref = "consumerMessageListener" />
-
</ bean >
我们可以看到,我们定义了queue的ActiveMQQueue目的地,我们的侦听器正在侦听发送到此目的地的消息。
到目前为止,我们的发电机和消费者已经配置完毕,这意味着我们的集成已经完成。这次完成Spring配置文件应如下所示:
Xml代码
-
<? xml version = "1.0" encoding = "UTF-8" ?>
-
< beans xmlns = "http://www.springframework.org/schema/beans"
-
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:context = "http://www.springframework.org/schema/context"
-
xmlns:jms = "http://www.springframework.org/schema/jms"
-
xsi:schemaLocation ="http://www.springframework.org/schema/beans
-
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
-
http://www.springframework.org/schema/context/spring-context-3.0.xsd
-
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
-
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" >
-
< context:component-scan base-package = "com.tiantian" />
-
< bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >
-
< property name = "connectionFactory" ref = "connectionFactory" />
-
</ bean >
-
< bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >
-
< property name = "brokerURL" value = "tcp://localhost:61616" />
-
</ bean >
-
< bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >
-
< property name = "targetConnectionFactory" ref = "targetConnectionFactory" />
-
</ bean >
-
< bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >
-
< constructor-arg >
-
< value > queue </ value >
-
</ constructor-arg >
-
</ bean >
-
< bean id = "consumerMessageListener" class = "com.tiantian.springintejms.listener.ConsumerMessageListener" />
-
< bean id = "jmsContainer"
-
class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >
-
< property name = "connectionFactory" ref = "connectionFactory" />
-
< property name = "destination" ref = "queueDestination" />
-
< property name = "messageListener" ref = "consumerMessageListener" />
-
</ bean >
-
</ beans >
然后让我们测试一下,看看我们的集成是否真的成功了。测试代码如下:
Java代码
-
package com.tiantian.springintejms.test;
-
import javax.jms.Destination;
-
import org.junit.Test;
-
import org.junit.runner.RunWith;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.beans.factory.annotation.Qualifier;
-
import org.springframework.test.context.ContextConfiguration;
-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
-
import com.tiantian.springintejms.service.ProducerService;
-
@RunWith (SpringJUnit4ClassRunner. class )
-
@ContextConfiguration ( "/applicationContext.xml" )
-
public class ProducerConsumerTest {
-
@Autowired
-
private ProducerService producerService;
-
@Autowired
-
@Qualifier ( "queueDestination" )
-
private Destination destination;
-
@Test
-
public void testSend() {
-
for ( int i= 0 ; i< 2 ; i++) {
-
producerService.sendMessage(destination, "你好,制片人!这是一条信息:" + (i+ 1 ));
-
}
-
}
-
}
在上面的测试代码中,我们使用生产者发送了两条消息。通常,消费者应该能够收到这两条消息。运行测试代码后,控制台输出如下:
看,控制台做出了正确的输出,这表明我们的集成确实成功了。
原始地址: https://elim.iteye.com/blog/1893038
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除