Spring结合JMS(二)——三种消息窃听器版权声明

原创
小哥 3年前 (2022-11-11) 阅读数 3 #大杂烩

1.3     消息侦听器MessageListener

在Spring整合JMS的应用中我们在定义消息侦听器的时候一共可以定义三种类型的消息侦听器,分别是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。以下是这些类型之间的差异。

1.3.1  MessageListener

MessageListener是最原始的消息侦听器,它是JMS规范中定义的接口。它定义了用于处理接收到的消息的。onMessage方法,它只接收一个Message参数。我们前面在讲配置消费者的时候用的消息侦听器就是MessageListener,代码如下:

Java代码

  1. import javax.jms.JMSException;

  2. import javax.jms.Message;

  3. import javax.jms.MessageListener;

  4. import javax.jms.TextMessage;

  5. public class ConsumerMessageListener implements MessageListener {

  6. public void onMessage(Message message) {

  7. //在这里我们知道制作人正在发送一条纯文本消息,所以在这里你可以直接投,或者直接投。onMessage方法参数已更改Message的子类TextMessage

  8. TextMessage textMsg = (TextMessage) message;

  9. System.out.println( "收到一条纯文本消息。" );

  10. try {

  11. System.out.println( "消息是:" + textMsg.getText());

  12. } catch (JMSException e) {

  13. e.printStackTrace();

  14. }

  15. }

  16. }

1.3.2  SessionAwareMessageListener

SessionAwareMessageListener是Spring为我们提供,这不是标准的JMS MessageListener。MessageListener如果我们使用MessageListener在处理收到的消息时,我们需要发送一条消息来通知对方我们已经收到了消息,因此此时我们需要在代码中获取另一条消息。Connection或Session。SessionAwareMessageListener该设计是为了方便我们在收到消息后发送回复消息,这也为我们提供了对收到消息的处理。onMessage方法,但此方法可以同时接收两个参数,一个是表示当前接收的消息。Message另一个是它可以用来发送消息。Session对象让我们先看一段代码:

Java代码

  1. package com.tiantian.springintejms.listener;

  2. import javax.jms.Destination;

  3. import javax.jms.JMSException;

  4. import javax.jms.Message;

  5. import javax.jms.MessageProducer;

  6. import javax.jms.Session;

  7. import javax.jms.TextMessage;

  8. import org.springframework.jms.listener.SessionAwareMessageListener;

  9. public class ConsumerSessionAwareMessageListener implements

  10. SessionAwareMessageListener {

  11. private Destination destination;

  12. public void onMessage(TextMessage message, Session session) throws JMSException {

  13. System.out.println( "收到一条消息" );

  14. System.out.println( "消息是:" + message.getText());

  15. MessageProducer producer = session.createProducer(destination);

  16. Message textMessage = session.createTextMessage( "ConsumerSessionAwareMessageListener。。。" );

  17. producer.send(textMessage);

  18. }

  19. public Destination getDestination() {

  20. returndestination;

  21. }

  22. public void setDestination(Destination destination) {

  23. this .destination = destination;

  24. }

  25. }

在上面的代码中,我们定义了SessionAwareMessageListener,在这个Listener收到消息后,我们使用相应的Session已创建destination然后使用创建的生产者发送生产者和相应的消息。

然后我们就进去了Spring的配置文件中配置该消息侦听器将处理来自一个叫sessionAwareQueue消息的目的地,并指向该目的地。MessageListener中通过set方法注入其属性。destination的值为queueDestination。所以当我们SessionAwareMessageListener收到消息后queueDestination发送消息。

Xml代码

  1. <? xml version = "1.0" encoding = "UTF-8" ?>

  2. < beans xmlns = "http://www.springframework.org/schema/beans"

  3. xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:context = "http://www.springframework.org/schema/context"

  4. xmlns:jms = "http://www.springframework.org/schema/jms"

  5. xsi:schemaLocation ="http://www.springframework.org/schema/beans

  6. http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

  7. http://www.springframework.org/schema/context

  8. http://www.springframework.org/schema/context/spring-context-3.0.xsd

  9. http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

  10. http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd" >

  11. < context:component-scan base-package = "com.tiantian" />

  12. < bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate" >

  13. < property name = "connectionFactory" ref = "connectionFactory" />

  14. </ bean >

  15. < bean id = "targetConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory" >

  16. < property name = "brokerURL" value = "tcp://localhost:61616" />

  17. </ bean >

  18. < bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory" >

  19. < property name = "targetConnectionFactory" ref = "targetConnectionFactory" />

  20. </ bean >

  21. < bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue" >

  22. < constructor-arg >

  23. < value > queue </ value >

  24. </ constructor-arg >

  25. </ bean >

  26. < bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" >

  27. < constructor-arg >

  28. < value > sessionAwareQueue </ value >

  29. </ constructor-arg >

  30. </ bean >

  31. < bean id = "consumerMessageListener" class = "com.tiantian.springintejms.listener.ConsumerMessageListener" />

  32. < bean id = "consumerSessionAwareMessageListener" class = "com.tiantian.springintejms.listener.ConsumerSessionAwareMessageListener" >

  33. < property name = "destination" ref = "queueDestination" />

  34. </ bean >

  35. < bean id = "jmsContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  36. < property name = "connectionFactory" ref = "connectionFactory" />

  37. < property name = "destination" ref = "queueDestination" />

  38. < property name = "messageListener" ref = "consumerMessageListener" />

  39. </ bean >

  40. < bean id = "sessionAwareListenerContainer"

  41. class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  42. < property name = "connectionFactory" ref = "connectionFactory" />

  43. < property name = "destination" ref = "sessionAwareQueue" />

  44. < property name = "messageListener" ref = "consumerSessionAwareMessageListener" />

  45. </ bean >

  46. </ beans >

然后让我们做一个测试,测试代码如下:

Java代码

  1. @RunWith (SpringJUnit4ClassRunner. class )

  2. @ContextConfiguration ( "/applicationContext.xml" )

  3. public class ProducerConsumerTest {

  4. @Autowired

  5. private ProducerService producerService;

  6. @Autowired

  7. @Qualifier ( "sessionAwareQueue" )

  8. private Destination sessionAwareQueue;

  9. @Test

  10. public void testSessionAwareMessageListener() {

  11. producerService.sendMessage(sessionAwareQueue, "测试SessionAwareMessageListener" );

  12. }

  13. }

在上面的测试代码中,我们通过先前定义的生产者进入我们定义的生产者。SessionAwareMessageListener监听的sessionAwareQueue已发送消息。程序运行后,控制台输出如下:

这表明我们已经成功地sessionAwareQueue将发送一条纯文本消息ConsumerSessionAwareMessageListener的onMessage处理方法,单位:onMessage方法中ConsumerSessionAwareMessageListener它只是打印出接收到的纯文本信息的内容,然后返回。queueDestination发送了一条纯文本消息,消息内容为“ConsumerSessionAwareMessageListener…”,消息随后ConsumerMessageListener根据我们的定义,在ConsumerMessageListener此外,我简单地打印了收到的消息的内容。

1.3.3  MessageListenerAdapter

MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,其主要功能是键入收到的消息,然后通过反射将其发送给普通消息。Java用于处理的类。

MessageListenerAdapter收到的消息将按如下方式转换:

TextMessage转换为String对象;

BytesMessage转换为byte数组;

MapMessage转换为Map对象;

ObjectMessage转换为相应的Serializable对象。

自从我之前说过MessageListenerAdapter将对接收到的消息进行类型转换,然后使用反射将其提供给真正的目标处理器Java要处理的类(如果真正的目标处理器是MessageListener或者SessionAwareMessageListener,那么Spring收到的将直接使用。Message对象称其为onMessage方法,而不使用反射来调用),那么我们将定义MessageListenerAdapter您需要为它指定这样一个目标类。我们可以通过这个目标类。MessageListenerAdapter规定了施工方法参数,如:

Xml代码

  1. < bean id = "messageListenerAdapter" class = "org.springframework.jms.listener.adapter.MessageListenerAdapter" >
  2. < constructor-arg >
  3. < bean class = "com.tiantian.springintejms.listener.ConsumerListener" />
  4. </ constructor-arg >
  5. </ bean >

它也可以通过delegate要指定的属性,例如:

Xml代码

  1. < bean id = "messageListenerAdapter" class = "org.springframework.jms.listener.adapter.MessageListenerAdapter" >
  2. < property name = "delegate" >
  3. < bean class = "com.tiantian.springintejms.listener.ConsumerListener" />
  4. </ property >
  5. < property name = "defaultListenerMethod" value = "receiveMessage" />
  6. </ bean >

如前所述,如果我们将此目标处理器指定为MessageListener或者SessionAwareMessageListener的时候Spring将直接利用收到的Message对象将其调用为方法参数。onMessage方法但如果指定的目标处理器是正常的Java类时Spring将利用Message类型转换后的对象用作参数,通过反射调用实际目标处理器的处理方法。Spring我如何知道该调用哪种方法?这已经结束了MessageListenerAdapter的defaultListenerMethod当我们没有指定属性时,Spring默认情况下调用目标处理器。handleMessage方法。

让我们看一个例子,假设我们有一个普通的。Java类ConsumerListener它应该有两种方法,handleMessage和receiveMessage,具有以下代码:

Java代码

  1. package com.tiantian.springintejms.listener;

  2. public class ConsumerListener {

  3. public void handleMessage(String message) {

  4. System.out.println( "ConsumerListener通过handleMessage接收到一个纯文本消息,消息是:" + message);

  5. }

  6. public void receiveMessage(String message) {

  7. System.out.println( "ConsumerListener通过receiveMessage接收到一个纯文本消息,消息是:" + message);

  8. }

  9. }

假设我们要把它作为一个消息侦听器来监听发送到adapterQueue这一次我们可以定义相应的MessageListenerAdapter将其视为MessageListener使用。

Xml代码

  1. < bean id = "messageListenerAdapter" class = "org.springframework.jms.listener.adapter.MessageListenerAdapter" >
  2. < property name = "delegate" >
  3. < bean class = "com.tiantian.springintejms.listener.ConsumerListener" />
  4. </ property >
  5. < property name = "defaultListenerMethod" value = "receiveMessage" />
  6. </ bean >

当然MessageListener之后,我们还需要配置其相应的MessageListenerContainer,配置如下:

Xml代码

  1. < bean id = "messageListenerAdapterContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  2. < property name = "connectionFactory" ref = "connectionFactory" />

  3. < property name = "destination" ref = "adapterQueue" />

  4. < property name = "messageListener" ref = "messageListenerAdapter" />

  5. </ bean >

  6. < bean id = "adapterQueue" class = "org.apache.activemq.command.ActiveMQQueue" >

  7. < constructor-arg >

  8. < value > adapterQueue </ value >

  9. </ constructor-arg >

  10. </ bean >

在上面的MessageListenerAdapter其中我们指定defaultListenerMethod属性的值为receiveMessage,所以当MessageListenerAdapter收到消息后,我们将自动调用指定的ConsumerListener的receiveMessage方法。

对于上述代码,我们定义测试代码如下:

Java代码

  1. package com.tiantian.springintejms.test;

  2. import javax.jms.Destination;

  3. import org.junit.Test;

  4. import org.junit.runner.RunWith;

  5. import org.springframework.beans.factory.annotation.Autowired;

  6. import org.springframework.beans.factory.annotation.Qualifier;

  7. import org.springframework.test.context.ContextConfiguration;

  8. import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

  9. import com.tiantian.springintejms.service.ProducerService;

  10. @RunWith (SpringJUnit4ClassRunner. class )

  11. @ContextConfiguration ( "/applicationContext.xml" )

  12. public class ProducerConsumerTest {

  13. @Autowired

  14. @Qualifier ( "adapterQueue" )

  15. private Destination adapterQueue;

  16. @Test

  17. public void testMessageListenerAdapter() {

  18. producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter" );

  19. }

  20. }

此时,我们将看到控制台输出如下:


如果我们没有指定MessageListenerAdapter的defaultListenerMethod属性时,控制台输出以下结果:


MessageListenerAdapter除了自动将普通Java类当做MessageListener为了处理接收到的消息, 另一个主要功能是自动发送返回消息。

当我们用于处理所接收消息的方法的返回值不为空时,Spring它会自动封装为一个。JMS Message,然后自动回复。那么,此时该回复消息将发送到哪里?有两种主要的方式来指定这一点。
首先,它可以被发送。Message的setJMSReplyTo方法指定与消息对应的回复消息的目标。在这里,我们修改生产者的代码以发送消息,并在发送消息之前将消息对应的回复目的地指定为调用。responseQueue以下代码的队列目标:

Java代码

  1. package com.tiantian.springintejms.service.impl;

  2. import javax.jms.Destination;

  3. import javax.jms.JMSException;

  4. import javax.jms.Message;

  5. import javax.jms.Session;

  6. import javax.jms.TextMessage;

  7. import org.springframework.beans.factory.annotation.Autowired;

  8. import org.springframework.beans.factory.annotation.Qualifier;

  9. import org.springframework.jms.core.JmsTemplate;

  10. import org.springframework.jms.core.MessageCreator;

  11. import org.springframework.stereotype.Component;

  12. import com.tiantian.springintejms.service.ProducerService;

  13. @Component

  14. public class ProducerServiceImpl implements ProducerService {

  15. @Autowired

  16. private JmsTemplate jmsTemplate;

  17. @Autowired

  18. @Qualifier ( "responseQueue" )

  19. private Destination responseDestination;

  20. public void sendMessage(Destination destination, final String message) {

  21. System.out.println( "---------------生产者发送消息-----------------" );

  22. System.out.println( "---------------制片人发送了一条消息:" + message);

  23. jmsTemplate.send(destination, new MessageCreator() {

  24. public Message createMessage(Session session) throws JMSException {

  25. TextMessage textMessage = session.createTextMessage(message);

  26. textMessage.setJMSReplyTo(responseDestination);

  27. return textMessage;

  28. }

  29. });

  30. }

  31. }

然后定义responseQueue的队列目的地及其对应的消息侦听器和监听容器。

Xml代码

  1. < bean id = "responseQueue" class = "org.apache.activemq.command.ActiveMQQueue" >

  2. < constructor-arg >

  3. < value > responseQueue </ value >

  4. </ constructor-arg >

  5. </ bean >

  6. < bean id = "responseQueueListener" class = "com.tiantian.springintejms.listener.ResponseQueueListener" />

  7. < bean id = "responseQueueMessageListenerContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  8. < property name = "connectionFactory" ref = "connectionFactory" />

  9. < property name = "destination" ref = "responseQueue" />

  10. < property name = "messageListener" ref = "responseQueueListener" />

  11. </ bean >

ResponseQueueListener定义如下:

Java代码

  1. public class ResponseQueueListener implements MessageListener {

  2. public void onMessage(Message message) {

  3. if (message instanceof TextMessage) {

  4. TextMessage textMessage = (TextMessage) message;

  5. try {

  6. System.out.println( "已接收已发送responseQueue对于这样一条短信:" + textMessage.getText());

  7. } catch (JMSException e) {

  8. e.printStackTrace();

  9. }

  10. }

  11. }

  12. }

然后我们收到消息ConsumerListener的receiveMessage方法如下:

Java代码

  1. /**
  2. * 当返回类型为非-null时MessageListenerAdapter返回值自动封装为一。Message,然后回复
  3. * @param message
  4. * @return
  5. */
  6. public String receiveMessage(String message) {
  7. System.out.println( "ConsumerListener通过receiveMessage接收到一个纯文本消息,消息是:" + message);
  8. return "这是ConsumerListener对象的receiveMessage方法的返回值。" ;
  9. }

我们可以看到,在上面负责接收消息。receiveMessage方法的返回值为非空。

然后我们运行我们的测试代码,并将生产者用于我们定义的MessageListenerAdapter负责处理adapterQueue目的地发送消息。测试代码如下所示:

Java代码

  1. @RunWith (SpringJUnit4ClassRunner. class )

  2. @ContextConfiguration ( "/applicationContext.xml" )

  3. public class ProducerConsumerTest {

  4. @Autowired

  5. private ProducerService producerService;

  6. @Qualifier ( "adapterQueue" )

  7. @Autowired

  8. private Destination adapterQueue;

  9. @Test

  10. public void testMessageListenerAdapter() {

  11. producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter" );

  12. }

  13. }

运行上述测试代码后,控制台输出如下:


这意味着我们的制作人发送的消息MessageListenerAdapter在处理之后,MessageListenerAdapter它确实将侦听器的返回封装为一个。Message往原Message通过setJMSReplyTo该方法指定的回复目的地发送消息。对于MessageListenerAdapter相应的侦听器处理方法返回的是一个null值或返回类型为void的情况,MessageListenerAdapter它不会自动回复消息。感兴趣的网友可以自己测试。

第二MessageListenerAdapter的defaultResponseDestination要指定的属性。这里我们还做了一个测试,首先保持生产者发送消息的代码不变,即在发送消息之前不通过。Message的setJMSReplyTo方法指定消息的回复目的地;然后我们就进去了定义MessageListenerAdapter通过它的时间defaultResponseDestination该属性指定其默认答复目标为“defaultResponseQueue“,并定义defaultResponseQueue对应的消息侦听器和消息侦听容器。

Xml代码

  1. < bean id = "messageListenerAdapter" class = "org.springframework.jms.listener.adapter.MessageListenerAdapter" >

  2. < bean id = "messageListenerAdapterContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  3. < property name = "connectionFactory" ref = "connectionFactory" />

  4. < property name = "destination" ref = "adapterQueue" />

  5. < property name = "messageListener" ref = "messageListenerAdapter" />

  6. </ bean >

  7. !-- 默认消息回复队列 -- >

  8. < bean id = "defaultResponseQueue" class = "org.apache.activemq.command.ActiveMQQueue" >

  9. < constructor-arg >

  10. < value > defaultResponseQueue </ value >

  11. </ constructor-arg >

  12. </ bean >

  13. < bean id = "defaultResponseQueueListener" class = "com.tiantian.springintejms.listener.DefaultResponseQueueListener" />

  14. < bean id = "defaultResponseQueueMessageListenerContainer" class = "org.springframework.jms.listener.DefaultMessageListenerContainer" >

  15. < property name = "connectionFactory" ref = "connectionFactory" />

  16. < property name = "destination" ref = "defaultResponseQueue" />

  17. < property name = "messageListener" ref = "defaultResponseQueueListener" />

  18. </ bean >

DefaultResponseQueueListener代码如下:

Java代码

  1. package com.tiantian.springintejms.listener;

  2. import javax.jms.JMSException;

  3. import javax.jms.Message;

  4. import javax.jms.MessageListener;

  5. import javax.jms.TextMessage;

  6. public class DefaultResponseQueueListener implements MessageListener {

  7. public void onMessage(Message message) {

  8. if (message instanceof TextMessage) {

  9. TextMessage textMessage = (TextMessage) message;

  10. try {

  11. System.out.println( "DefaultResponseQueueListener已接收已发送defaultResponseQueue对于这样一条短信:" + textMessage.getText());

  12. } catch (JMSException e) {

  13. e.printStackTrace();

  14. }

  15. }

  16. }

  17. }

此时,运行以下测试代码:

Java代码

  1. @Test
  2. public void testMessageListenerAdapter() {
  3. producerService.sendMessage(adapterQueue, "测试MessageListenerAdapter" );
  4. }

控制台将输出以下内容:


这说明MessageListenerAdapter真实消息处理器返回的非空内容将自动封装为一个。Message向通行证发送回复消息。defaultResponseDestination属性指定的默认消息回复目标。

因为我们可以用两种方式指定MessageListenerAdapter回复消息的目的地是自动发送的,所以当指定了两种方法并且它们的目的地不同时,如何发送?你寄两个还是只寄一个?我不会在这里详细介绍这部分测试。感兴趣的网友可以自己动手。在这里,我可以直接告诉您,当两个方法都指定消息的回复目的地时,消息就会被发送。setJMSReplyTo由该方法指定的目的地将具有更高的优先级,MessageListenerAdapter回复消息将仅发送到方法指定的消息回复目的地。

原始地址: https://elim.iteye.com/blog/1893676

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除