AMQ学习手记-13.Spring-jms的性能转载

原创
小哥 3年前 (2022-11-11) 阅读数 9 #大杂烩

如何使用spring-jms来简化jms客户开发?

本文主要记录如何配置它以供以后重用,而不是解释原理。 没有掌握原则的人。

producer端


producer终端负责发送,此处使用。JmsTemplate。

spring配置

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> 6
7 8 9 10
11 12 13 14 15 16 17

JmsTemplate默认将jms-config解析为Queue类型的Destination。如果需要解析它。Topic类型,需要jmsTemplate指定属性pubSubDomain=true,配置如下:

1 2 3 4 5 6

测试类

1 package cn.sinobest.asj.producer.springsupport.jt; 2 import javax.annotation.Resource; 3 4 import org.junit.Test; 5 import org.junit.runner.RunWith; 6 import org.springframework.jms.core.JmsTemplate; 7 import org.springframework.test.context.ContextConfiguration; 8 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; 9 10 @RunWith(SpringJUnit4ClassRunner.class) // 配置spring组件运行时 11 @ContextConfiguration("/spring-jms-demo.xml") // 配置文件 12 public class JmsTemplateSendWithContextTest { 13 @Resource(name = "jmsTemplate") 14 private JmsTemplate jt; 15
16 @Test 17 public void testSendToDefaultDestination() { 18 String message = "you can config JmsTemplate in xml, then use it for send."; 19 jt.convertAndSend(message); 20 } 21 }

展示这个测试类,告诉每个人使用它。Spring+JUnit4注释编写单元测试可以非常方便地加载。Spring配置并初始化bean资源;使用Resource可以获得评论bean资源。如何使用JmsTemplate发送消息,但不是重点,因为在。 08. Spring-JmsTemplate之发送 在中,相关API。

consumer端


consumer终端负责接收,有两种接收方式,同步和异步。 03. 如何接收消息 引入。

同步接收

这里使用JmsTemplate用于同步接收。以上已被使用JmsTemplate发送、接收和发送的配置有什么区别?

如果我们不希望客户端一直阻止等待消息,那么我们需要关心。receiveTimeout属性,以毫秒为单位。如果在此时间之后没有收到消息,将返回该消息。null。

1 2 3 4 5 6

异步接收

异步接收基于侦听器接收,传统配置方法是配置一个ListenerContainer的bean,在这个bean里维护listener。还有一种简化的配置方案,可用于container放置多个listener。

传统配置

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> 6
7 8 9 10
11 12 <bean id="messageContainer" 13 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 14 15 16 17 18

其中,SimpleMessageListener是接口javax.jms.MessageListener实现类的。

DefaultMessageListenerContainer负责将messageListener注册到connectionFactory的destination,一旦destination如果有消息,消息将被推送messageListener。

DefaultMessageListenerContainer有许多功能配置,如下所述:

1.Destination及类型

使用以下内容API,可以设置Destination

  • public void setPubSubDomain(boolean pubSubDomain)
    设置destination的类型,true-Topics,false-Queues;默认为false。
  • public void setDestinationName(String destinationName)
    设置destination的name,结合pubSubDomain使用,根据destinationName解析为特定Destination。
  • public void setDestination(Destination destination)
    设置destination。

2.多线程

一个DMLC在实例中,只能管理一个MessageListener实例,但是可以使用以下内容方法设置多线程:

  • public void setConcurrency(String concurrency)
    通过"lower-upper"格式化字符串,设置线程数的下限和上限等。"5-10"; 或者只设置上限,默认为1,如"10"。

如果不使用事务,多线程可以显著提高侦听器的接收速度。

3.确认模式

下面的API用于设置确认模式:

  • public void setSessionAcknowledgeMode(int sessionAcknowledgeMode)
    设置确认模式,sessionAcknowledgeMode可以取javax.jms.Session.AUTO_ACKNOWLEDGE(默认),javax.jms.Session.CLIENT_ACKNOWLEDGE,javax.jms.Session.DUPS_OK_ACKNOWLEDGE。
  • public void setSessionAcknowledgeModeName(String constantName)
    确认模式由名称设置,默认为"AUTO_ACKNOWLEDGE"。

4.事务

下面的API,用于设置事务:

  • public void setSessionTransacted(boolean sessionTransacted)

    设置是否使用事务,默认值为false。

更多DefaultMessageListenerContainer有关相关配置,请参阅API。

减少了配置

它被称为配置的精简版本,因为容器可以配置多个侦听器。

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://www.springframework.org/schema/beans 5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd"> 6
7 8 9 10
11 12 <bean id="messageContainer" 13 class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 14 15 16 17 18  

listener-container作为一个容器,可以有多个listener每个子元素listener表示侦听器。

1.Destination类型

listener-container有destination-type属性,您可以获取值["queue", "topic"],默认为"queue",它确定listener将destination解析为Queue还是Topic类型。

2.pojo监听器

listener消息已转换,因此ref的目标bean是一个pojo,method是这个pojo方法的名称--此方法的参数为和。Message中的数据类型兼容。下面给出了我们使用的PojoListener:

1 package cn.sinobest.asj.consumer.springsupport.async; 2 3 /* 4 一个pojo作为listener,接收转换后的消息. 5 @author lijinlong 6 7 */ 8 public class PojoListener { 9 public void passMeMessage(String message) { 10 System.out.println("从queue收到的消息:" + message); 11 } 12 } 

3.多线程

为listener-container元素设置concurrency属性,可以指定线程数的下限和上限。这和DefaultMessageListenerContainer的相同。

4.确认模式和交易4

listener-container的acknowledge属性,您可以指定确认模式或交易、值范围。["auto", "client", "dups-ok", "transacted"];默认为"auto",事务性使用"transacted"。

有关精简配置的更多参数,请参阅 spring-jms-4.2.xsd

受信任的包

ObjectMessage使用机制不安全,ActiveMQ自5.12.2和5.13.0之后,强制consumer端声明一份受信任的包列表,只有当ObjectMessage中的Object在受信任的包中,只能提取它。

您可以这样配置受信任的包:

1 2 3 4 5 6 java.lang 7 java.sql 8 cn.sinobest 9 10 11

关于ObjectMessage安全说明,请参阅 http://activemq.apache.org/objectmessage.html

ConnectionFactory的bean配置


在阅读网友博客帖子的过程中,我看到了几种配置方式,但我不理解优缺点的区别。

1.ActiveMQConnectionFactory

在之前的配置中,我们接触到了此配置:

1 2 3

2.SingleConnectionFactory

使用org.springframework.jms.connection.SingleConnectionFactory对ActiveMQConnectionFactory建议对测试或独立环境进行打包。

1 <bean id="connectionFactory" 2 class="org.springframework.jms.connection.SingleConnectionFactory"> 3 4 5 6 7 8

3.PooledConnectionFactory

使用org.apache.activemq.pool.PooledConnectionFactory对ActiveMQConnectionFactory对于包装,我不知道有什么优化。

1 2 3 4 5 6 7

分布式事务


首先,介绍了数据源的概念,数据源可以提供数据或存储数据,一般有两个特点。数据库、消息队列等是数据源。

如果要使用多个数据源,请考虑分布式事务。例如,从broker-A消息在中接收、发送broker-B。如果在发送过程中发生错误,则不应确认收到的消息,否则将broker-A删除,导致消息丢失。还是你想从broker-A同样的问题是接收消息和写入数据库。分布式事务的问题在于,它们涉及多个数据源,从而确保涉及多数据源的操作同时成功或失败。

关于分布式事务的讨论,可选参考。 XA事务处理 ,下面针对两个场景,讨论配置。

1.broker-A到broker-B

从broker-A消息在中接收、发送broker-B在里面在应用程序发送数据的场景中broker-B首先存储发送失败的数据。broker-A,然后由中的定时任务执行。broker-A从获取数据,发送。broker-B。

这里我们使用同步接收,它基于JmsTemplate的。

Spring提供了org.springframework.transaction.jta.JtaTransactionManager,但依赖于底层应用服务器支持。JTA全球交易。这里我们不使用这样的服务器,而是使用它。Atomikos框架。

事务配置:

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:tx="http://www.springframework.org/schema/tx" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 7 http://www.springframework.org/schema/tx 8 http://www.springframework.org/schema/tx/spring-tx-3.0.xsd"> 9 10 <bean id="jtaTransactionManager" 11 class="org.springframework.transaction.jta.JtaTransactionManager"> 12 13 <bean class="com.atomikos.icatch.jta.UserTransactionManager" 14 init-method="init" destroy-method="close"> 15 16 17 18 19 20 21 22 23 24
25 26
27 28 29 30 31 32 33 34 35 36 37
38 39 40 41 42 43 44 45 46 47
48 49 50 51 52 53 54
55 56 57 58 59 60

服务级别获取sourceJmsTemplate、targetJmsTemplate,在业务方法中使用前者接收消息,后者发送消息。要注释的业务代码org.springframework.transaction.annotation.Transactional:

1 @Transactional 2 public boolean transport() throws JmsException{ 3 Message remsg = souJT.receive(); 4 if (remsg != null) { 5 final ObjectMessage omsg = (ObjectMessage) remsg; 6
7 tarJT.send(new MessageCreator() { 8 public Message createMessage(Session session) 9 throws JMSException { 10 Message result = session.createObjectMessage(omsg 11 .getObject()); 12 return result; 13 } 14 }); 15 } 16 return remsg != null; 17 }

事实上,尽管我在项目中使用了上述配置,但我并不了解其原理。还有2这个问题值得一提:

  1. 定时任务和事务之间的冲突
    服务类是事务性的,如果它也是一个定时任务,就会出现问题。
  2. 事务和循环之间的冲突
    最初计划在事务中使用循环souJT收到的消息是null作为结束条件,每个收据都会发送一个。因此,即使有更多消息,它们也只执行一次,无论Atomikos也不知道是什么原因造成的。随后,循环被放置在一个定时任务中,在循环体中调用业务方法,以根据返回的结果确定是否结束循环。

2.broker-B到数据库

从broker-B接收数据库中的消息并写入数据库。此时间未使用Atomikos,还实现了在写库失败时回滚消息的事务效果;broker-A到broker-B,如果未使用Atomikos无法实现交易效果。也许是因为它使用同步接收,然后我们使用异步接收。接收方式是否与分布式事务有关,结论尚不确定。

在这里,我们甚至不再使用它。spring的JtaTransactionManager。

1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:jms="http://www.springframework.org/schema/jms" 5 xsi:schemaLocation="http://www.springframework.org/schema/beans 6 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd 7 http://www.springframework.org/schema/jms 8 http://www.springframework.org/schema/jms/spring-jms-4.2.xsd"> 9 10 11 12
13 14
15 <jms:listener-container connection-factory="connectionFactory" 16 concurrency="1" acknowledge="transacted"> 17 <jms:listener destination="asj.log" ref="converteredLogListener" 18 method="onLog" /> 19 </jms:listener-container> 20

当然,我们必须声明listener-container的acknowledge属性为"transacted"打开交易。在里面ConverteredLogListener的onLog,只要写入失败时抛出的异常可以到达,就调用服务组件将数据放入存储。onLog方法,则事务被回滚,因此确保可以抛出异常。

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除