Spring结合JMS(一)——基于ActiveMQ达成转载

原创
小哥 3年前 (2022-11-11) 阅读数 4 #大杂烩

1.1     JMS 简介

JMS的全称是Java Message Service,即Java消息服务。它主要用于生产者和消费者之间的消息传递,生产者和消费者负责生成消息,消费者负责接收消息。如果我们将其应用于实际的业务需求,我们可以使用生产者在特定时间生成消息并发送它。相应的消费者将在收到相应的消息后完成相应的业务逻辑。消息传递有两种类型,一种是点对点,即生产者和消费者一一对应;/订阅模式,即生产者生成消息并发送消息后,可以由多个消费者接收。

1.2     Spring 整合JMS

对JMS在做了简单的介绍之后,接下来我们来谈谈它。Spring整合JMS具体流程。JMS这只是一个标准。当我们真正使用它时,我们需要有它的具体实现。在这里我们使用它Apache的activeMQ作为其实施。使用的依赖项利用率Maven根据以下内容进行管理:

Xml代码

  1. < dependencies >
  2. < dependency >
  3. < groupId > junit </ groupId >
  4. < artifactId > junit </ artifactId >
  5. < version > 4.10 </ version >
  6. < scope > test </ scope >
  7. </ dependency >
  8. < dependency >
  9. < groupId > org.springframework </ groupId >
  10. < artifactId > spring-context </ artifactId >
  11. < version > ${spring-version} </ version >
  12. </ dependency >
  13. < dependency >
  14. < groupId > org.springframework </ groupId >
  15. < artifactId > spring-jms </ artifactId >
  16. < version > ${spring-version} </ version >
  17. </ dependency >
  18. < dependency >
  19. < groupId > org.springframework </ groupId >
  20. < artifactId > spring-test </ artifactId >
  21. < version > ${spring-version} </ version >
  22. </ dependency >
  23. < dependency >
  24. < groupId > javax.annotation </ groupId >
  25. < artifactId > jsr250-api </ artifactId >
  26. < version > 1.0 </ version >
  27. </ dependency >
  28. < dependency >
  29. < groupId > org.apache.activemq </ groupId >
  30. < artifactId > activemq-core </ artifactId >
  31. < version > 5.7.0 </ version >
  32. </ dependency >
  33. </ dependencies >

1.2.1  activeMQ准备

由于使用了apache的activeMQ作为JMS实施,然后我们应该首先开始。apache在官方网站上下载activeMQ( http://activemq.apache.org/download.html ),执行解压缩并运行bin目录下方activemq.bat文件启动activeMQ。

1.2.2配置ConnectionFactory

ConnectionFactory用于生成JMS服务器的链接,Spring为我们提供多个ConnectionFactory,有SingleConnectionFactory和CachingConnectionFactory。SingleConnectionFactory对于建立JMS对服务器链接请求将始终返回相同的链接,并且将被忽略。Connection的close方法调用。CachingConnectionFactory继承了SingleConnectionFactory,所以它有SingleConnectionFactory在所有功能中,它还添加了一个缓存功能,可以缓存Session、MessageProducer和MessageConsumer。这里我们使用SingleConnectionFactory举个例子。

Xml代码

  1. < bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" />

这定义了生成JMS服务器链接ConnectionFactory是吗?答案是对的和错的。Spring提供的ConnectionFactory只是Spring用于管理ConnectionFactory真的生产JMS服务器链接ConnectionFactory还得是由JMS服务供应商提供并需要将其注入Spring提供的ConnectionFactory在里面我们在这里使用ActiveMQ实现的JMS,所以我们真的可以生产Connection应该是ActiveMQ提供的ConnectionFactory。因此,定义ConnectionFactory完整的代码应如下所示:

Xml代码

  1. < bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >

  2. < property name = "brokerURL" value = "tcp://localhost:61616" />

  3. </ bean >

  4. < bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >

  5. < property name = "targetConnectionFactory" ref = "targetConnectionFactory" />

  6. </ bean >

ActiveMQ为我们提供PooledConnectionFactory,通过注入ActiveMQConnectionFactory可用于放置Connection、Session和MessageProducer池,这可以大大减少我们的资源消耗。使用时PooledConnectionFactory当我们定义ConnectionFactory应定义如下:

Xml代码

  1. < bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >

  2. < property name = "brokerURL" value = "tcp://localhost:61616" />

  3. </ bean >

  4. < bean id = "pooledConnectionFactory" class = "org.apache.activemq.pool.PooledConnectionFactory" >

  5. < property name = "connectionFactory" ref = "targetConnectionFactory" />

  6. < property name = "maxConnections" value = "10" />

  7. </ bean >

  8. < bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >

  9. < property name = "targetConnectionFactory" ref = "pooledConnectionFactory" />

  10. </ bean >

1.2.3配置生产者

配置好ConnectionFactory然后我们需要分配生产者。制作人负责生成和发送消息JMS服务器,它通常对应于我们的一个业务逻辑服务实现类。但是我们的服务实现类如何发送消息?这通常是Spring对于我们的JmsTemplate类来实现,因此配置生成器实际上是消息发送配置的核心。JmsTemplate。对于消息的发送者来说,在发送消息时应该知道将消息发送到哪里。为此,我们定义JmsTemplate当你需要注入一个Spring提供的ConnectionFactory对象。

Xml代码

  1. < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >
  2. < property name = "connectionFactory" ref = "connectionFactory" />
  3. </ bean >

在实际使用中JmsTemplate发送消息时,我们需要知道消息的目的地,即。destination。在Jms有一个用于表示目的地。Destination接口,它没有任何方法定义,只是用于进行标识。当我们使用JmsTemplate消息发送时未指定。destination默认值将在以下情况下使用Destination。默认Destination可以定义jmsTemplate bean对象。defaultDestination或defaultDestinationName为了注射,defaultDestinationName对应的是一个普通字符串。在里面ActiveMQ两种类型Destination,一种是对等ActiveMQQueue,另一个是支持订阅/释放模式。ActiveMQTopic。在定义这两种类型时Destination当我们都能通过name要构造的属性,例如:

Xml代码

  1. < bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >
  2. < constructor-arg >
  3. < value > queue </ value >
  4. </ constructor-arg >
  5. </ bean >
  6. < bean id = "topicDestination" class = "org.apache.activemq.command.ActiveMQTopic" >
  7. < constructor-arg value = "topic" />
  8. </ bean >

假设我们定义ProducerService,里面有一个方向Destination发送纯文本消息的方法。sendMessage,那么我们的代码如下所示:

Java代码

  1. package com.tiantian.springintejms.service.impl;

  2. import javax.annotation.Resource;

  3. import javax.jms.Destination;

  4. import javax.jms.JMSException;

  5. import javax.jms.Message;

  6. import javax.jms.Session;

  7. import org.springframework.jms.core.JmsTemplate;

  8. import org.springframework.jms.core.MessageCreator;

  9. import org.springframework.stereotype.Component;

  10. import com.tiantian.springintejms.service.ProducerService;

  11. @Component

  12. public class ProducerServiceImpl implements ProducerService {

  13. private JmsTemplate jmsTemplate;

  14. public void sendMessage(Destination destination, final String message) {

  15. System.out.println( "---------------生产者发送消息-----------------" );

  16. System.out.println( "---------------制片人发送了一条消息:" + message);

  17. jmsTemplate.send(destination, new MessageCreator() {

  18. public Message createMessage(Session session) throws JMSException {

  19. return session.createTextMessage(message);

  20. }

  21. });

  22. }

  23. public JmsTemplate getJmsTemplate() {

  24. returnjmsTemplate;

  25. }

  26. @Resource

  27. public void setJmsTemplate(JmsTemplate jmsTemplate) {

  28. this .jmsTemplate = jmsTemplate;

  29. }

  30. }

我们可以从中看到sendMessage在方法体中,我们传递jmsTemplate向相应的Destination属于此时,我们生成一条简单的文本消息并将其发送到指定的目的地。Destination生产者已配置。

1.2.4配置使用者

生产者到指定目的地Destination发送消息后,下一步是消费者将消息消费到指定的目的地。那么消费者如何知道生产者将消息发送到指定的目的地Destination它在哪里?这已经结束了Spring为我们封装的消息侦听容器MessageListenerContainer实施,负责接收信息并将其分发给实际MessageListener处理。每个消费者都需要为每个目的地指定相应的目的地。MessageListenerContainer。对于消息侦听容器,除了知道要侦听哪个目的地之外,它还需要知道在哪里侦听,也就是说,它还必须知道要侦听哪一个目的地。JMS服务器,通过配置MessageConnectionFactory当注入一个ConnectionFactory实现。所以我们 在配置 MessageListenerContainer 必须指定三个属性,一个是侦听位置。 ConnectionFactory ; 一种是表示倾听或其他 Destination ; 一种是在收到消息后进行处理。 MessageListener 。Spring共有两种类型MessageListenerContainer,SimpleMessageListenerContainer和DefaultMessageListenerContainer。

SimpleMessageListenerContainer会话在开始时创建。session和消费者Consumer并将使用标准JMS MessageConsumer.setMessageListener()方法注册侦听器let。JMS提供者调用监听器的回调函数。它不能动态地适应运行时需求并参与外部事务管理。兼容性方面,它非常接近独立JMS规格,但通常不兼容Java EE的JMS限制。

大多数时候我们仍然使用DefaultMessageListenerContainer,跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer将动态适应运行时需求,并能够参与外部事务管理。它平衡了权利JMS低提供商要求,高级功能,如事务参与和兼容性Java EE环境。

定义消息的处理。 MessageListener

要定义消息的处理。MessageListener我们只需要实施JMS规范中的MessageListener界面很好。MessageListener接口中只有一个方法。onMessage方法,在收到消息时自动调用该方法。

Java代码

  1. package com.tiantian.springintejms.listener;

  2. import javax.jms.JMSException;

  3. import javax.jms.Message;

  4. import javax.jms.MessageListener;

  5. import javax.jms.TextMessage;

  6. public class ConsumerMessageListener implements MessageListener {

  7. public void onMessage(Message message) {

  8. //这里我们知道制作人正在发送一条纯文本消息,因此您可以在这里直接进行演员。

  9. TextMessage textMsg = (TextMessage) message;

  10. System.out.println( "收到一条纯文本消息。" );

  11. try {

  12. System.out.println( "消息是:" + textMsg.getText());

  13. } catch (JMSException e) {

  14. e.printStackTrace();

  15. }

  16. }

  17. }

有了MessageListener之后,我们可以Spring在配置文件中配置了消息侦听容器。

Xml代码

  1. < bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >

  2. < constructor-arg >

  3. < value > queue </ value >

  4. </ constructor-arg >

  5. </ bean >

  6. < bean id = "consumerMessageListener" class = "com.tiantian.springintejms.listener.ConsumerMessageListener" />

  7. < bean id = "jmsContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  8. < property name = "connectionFactory" ref = "connectionFactory" />

  9. < property name = "destination" ref = "queueDestination" />

  10. < property name = "messageListener" ref = "consumerMessageListener" />

  11. </ bean >

我们可以看到,我们定义了queue的ActiveMQQueue目的地,我们的侦听器正在侦听发送到此目的地的消息。

到目前为止,我们的发电机和消费者已经配置完毕,这意味着我们的集成已经完成。这次完成Spring配置文件应如下所示:

Xml代码

  1. <? xml version = "1.0" encoding = "UTF-8" ?>

  2. < beans xmlns = "http://www.springframework.org/schema/beans"

  3. xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:context = "http://www.springframework.org/schema/context"

  4. xmlns:jms = "http://www.springframework.org/schema/jms"

  5. xsi:schemaLocation ="http://www.springframework.org/schema/beans

  6. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

  7. http://www.springframework.org/schema/context

  8. http://www.springframework.org/schema/context/spring-context-3.0.xsd

  9. http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

  10. http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" >

  11. < context:component-scan base-package = "com.tiantian" />

  12. < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >

  13. < property name = "connectionFactory" ref = "connectionFactory" />

  14. </ bean >

  15. < bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >

  16. < property name = "brokerURL" value = "tcp://localhost:61616" />

  17. </ bean >

  18. < bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >

  19. < property name = "targetConnectionFactory" ref = "targetConnectionFactory" />

  20. </ bean >

  21. < bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >

  22. < constructor-arg >

  23. < value > queue </ value >

  24. </ constructor-arg >

  25. </ bean >

  26. < bean id = "consumerMessageListener" class = "com.tiantian.springintejms.listener.ConsumerMessageListener" />

  27. < bean id = "jmsContainer"

  28. class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  29. < property name = "connectionFactory" ref = "connectionFactory" />

  30. < property name = "destination" ref = "queueDestination" />

  31. < property name = "messageListener" ref = "consumerMessageListener" />

  32. </ bean >

  33. </ beans >

然后让我们测试一下,看看我们的集成是否真的成功了。测试代码如下:

Java代码

  1. package com.tiantian.springintejms.test;

  2. import javax.jms.Destination;

  3. import org.junit.Test;

  4. import org.junit.runner.RunWith;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.beans.factory.annotation.Qualifier;

  7. import org.springframework.test.context.ContextConfiguration;

  8. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

  9. import com.tiantian.springintejms.service.ProducerService;

  10. @RunWith (SpringJUnit4ClassRunner. class )

  11. @ContextConfiguration ( "/applicationContext.xml" )

  12. public class ProducerConsumerTest {

  13. @Autowired

  14. private ProducerService producerService;

  15. @Autowired

  16. @Qualifier ( "queueDestination" )

  17. private Destination destination;

  18. @Test

  19. public void testSend() {

  20. for ( int i= 0 ; i< 2 ; i++) {

  21. producerService.sendMessage(destination, "你好,制片人!这是一条信息:" + (i+ 1 ));

  22. }

  23. }

  24. }

在上面的测试代码中,我们使用生产者发送了两条消息。通常,消费者应该能够收到这两条消息。运行测试代码后,控制台输出如下:

看,控制台做出了正确的输出,这表明我们的集成确实成功了。

原始地址: https://elim.iteye.com/blog/1893038

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除