AMQ学习手记-13.Spring-jms的性能转载
原创如何使用spring-jms来简化jms客户开发?
本文主要记录如何配置它以供以后重用,而不是解释原理。 没有掌握原则的人。
producer端
producer终端负责发送,此处使用。JmsTemplate。
spring配置

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://www.springframework.org/schema/beans
5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
6
7
11
12

JmsTemplate默认将jms-config解析为Queue类型的Destination。如果需要解析它。Topic类型,需要jmsTemplate指定属性pubSubDomain=true,配置如下:

1

测试类

1 package cn.sinobest.asj.producer.springsupport.jt;
2 import javax.annotation.Resource;
3
4 import org.junit.Test;
5 import org.junit.runner.RunWith;
6 import org.springframework.jms.core.JmsTemplate;
7 import org.springframework.test.context.ContextConfiguration;
8 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
9
10 @RunWith(SpringJUnit4ClassRunner.class) // 配置spring组件运行时
11 @ContextConfiguration("/spring-jms-demo.xml") // 配置文件
12 public class JmsTemplateSendWithContextTest {
13 @Resource(name = "jmsTemplate")
14 private JmsTemplate jt;
15
16 @Test
17 public void testSendToDefaultDestination() {
18 String message = "you can config JmsTemplate in xml, then use it for send.";
19 jt.convertAndSend(message);
20 }
21 }

展示这个测试类,告诉每个人使用它。Spring+JUnit4注释编写单元测试可以非常方便地加载。Spring配置并初始化bean资源;使用Resource可以获得评论bean资源。如何使用JmsTemplate发送消息,但不是重点,因为在。 08. Spring-JmsTemplate之发送 在中,相关API。
consumer端
consumer终端负责接收,有两种接收方式,同步和异步。 03. 如何接收消息 引入。
同步接收
这里使用JmsTemplate用于同步接收。以上已被使用JmsTemplate发送、接收和发送的配置有什么区别?
如果我们不希望客户端一直阻止等待消息,那么我们需要关心。receiveTimeout属性,以毫秒为单位。如果在此时间之后没有收到消息,将返回该消息。null。

1

异步接收
异步接收基于侦听器接收,传统配置方法是配置一个ListenerContainer的bean,在这个bean里维护listener。还有一种简化的配置方案,可用于container放置多个listener。
传统配置

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://www.springframework.org/schema/beans
5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
6
7
11

其中,SimpleMessageListener是接口javax.jms.MessageListener实现类的。
DefaultMessageListenerContainer负责将messageListener注册到connectionFactory的destination,一旦destination如果有消息,消息将被推送messageListener。
DefaultMessageListenerContainer有许多功能配置,如下所述:
1.Destination及类型
使用以下内容API,可以设置Destination
- public void setPubSubDomain(boolean pubSubDomain)
设置destination的类型,true-Topics,false-Queues;默认为false。 - public void setDestinationName(String destinationName)
设置destination的name,结合pubSubDomain使用,根据destinationName解析为特定Destination。 - public void setDestination(Destination destination)
设置destination。
2.多线程
一个DMLC在实例中,只能管理一个MessageListener实例,但是可以使用以下内容方法设置多线程:
- public void setConcurrency(String concurrency)
通过"lower-upper"格式化字符串,设置线程数的下限和上限等。"5-10"; 或者只设置上限,默认为1,如"10"。
如果不使用事务,多线程可以显著提高侦听器的接收速度。
3.确认模式
下面的API用于设置确认模式:
- public void setSessionAcknowledgeMode(int sessionAcknowledgeMode)
设置确认模式,sessionAcknowledgeMode可以取javax.jms.Session.AUTO_ACKNOWLEDGE(默认),javax.jms.Session.CLIENT_ACKNOWLEDGE,javax.jms.Session.DUPS_OK_ACKNOWLEDGE。 - public void setSessionAcknowledgeModeName(String constantName)
确认模式由名称设置,默认为"AUTO_ACKNOWLEDGE"。
4.事务
下面的API,用于设置事务:
-
public void setSessionTransacted(boolean sessionTransacted)
设置是否使用事务,默认值为false。
更多DefaultMessageListenerContainer有关相关配置,请参阅API。
减少了配置
它被称为配置的精简版本,因为容器可以配置多个侦听器。

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://www.springframework.org/schema/beans
5 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd">
6
7
11

listener-container作为一个容器,可以有多个listener每个子元素listener表示侦听器。
1.Destination类型
listener-container有destination-type属性,您可以获取值["queue", "topic"],默认为"queue",它确定listener将destination解析为Queue还是Topic类型。
2.pojo监听器
listener消息已转换,因此ref的目标bean是一个pojo,method是这个pojo方法的名称--此方法的参数为和。Message中的数据类型兼容。下面给出了我们使用的PojoListener:

1 package cn.sinobest.asj.consumer.springsupport.async; 2 3 /* 4 一个pojo作为listener,接收转换后的消息. 5 @author lijinlong 6 7 */ 8 public class PojoListener { 9 public void passMeMessage(String message) { 10 System.out.println("从queue收到的消息:" + message); 11 } 12 }

3.多线程
为listener-container元素设置concurrency属性,可以指定线程数的下限和上限。这和DefaultMessageListenerContainer的相同。
4.确认模式和交易4
listener-container的acknowledge属性,您可以指定确认模式或交易、值范围。["auto", "client", "dups-ok", "transacted"];默认为"auto",事务性使用"transacted"。
有关精简配置的更多参数,请参阅 spring-jms-4.2.xsd 。
受信任的包
ObjectMessage使用机制不安全,ActiveMQ自5.12.2和5.13.0之后,强制consumer端声明一份受信任的包列表,只有当ObjectMessage中的Object在受信任的包中,只能提取它。
您可以这样配置受信任的包:

1
5
6
10

关于ObjectMessage安全说明,请参阅 http://activemq.apache.org/objectmessage.html
ConnectionFactory的bean配置
在阅读网友博客帖子的过程中,我看到了几种配置方式,但我不理解优缺点的区别。
1.ActiveMQConnectionFactory
在之前的配置中,我们接触到了此配置:
1
2.SingleConnectionFactory
使用org.springframework.jms.connection.SingleConnectionFactory对ActiveMQConnectionFactory建议对测试或独立环境进行打包。

1 <bean id="connectionFactory"
2 class="org.springframework.jms.connection.SingleConnectionFactory">
3

3.PooledConnectionFactory
使用org.apache.activemq.pool.PooledConnectionFactory对ActiveMQConnectionFactory对于包装,我不知道有什么优化。

1

分布式事务
首先,介绍了数据源的概念,数据源可以提供数据或存储数据,一般有两个特点。数据库、消息队列等是数据源。
如果要使用多个数据源,请考虑分布式事务。例如,从broker-A消息在中接收、发送broker-B。如果在发送过程中发生错误,则不应确认收到的消息,否则将broker-A删除,导致消息丢失。还是你想从broker-A同样的问题是接收消息和写入数据库。分布式事务的问题在于,它们涉及多个数据源,从而确保涉及多数据源的操作同时成功或失败。
关于分布式事务的讨论,可选参考。 XA事务处理 ,下面针对两个场景,讨论配置。
1.broker-A到broker-B
从broker-A消息在中接收、发送broker-B在里面在应用程序发送数据的场景中broker-B首先存储发送失败的数据。broker-A,然后由中的定时任务执行。broker-A从获取数据,发送。broker-B。
这里我们使用同步接收,它基于JmsTemplate的。
Spring提供了org.springframework.transaction.jta.JtaTransactionManager,但依赖于底层应用服务器支持。JTA全球交易。这里我们不使用这样的服务器,而是使用它。Atomikos框架。
事务配置:

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:tx="http://www.springframework.org/schema/tx"
5 xsi:schemaLocation="http://www.springframework.org/schema/beans
6 http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
7 http://www.springframework.org/schema/tx
8 http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
9
10 <bean id="jtaTransactionManager"
11 class="org.springframework.transaction.jta.JtaTransactionManager">
12
25
27
28
38
48
55

服务级别获取sourceJmsTemplate、targetJmsTemplate,在业务方法中使用前者接收消息,后者发送消息。要注释的业务代码org.springframework.transaction.annotation.Transactional:

1 @Transactional
2 public boolean transport() throws JmsException{
3 Message remsg = souJT.receive();
4 if (remsg != null) {
5 final ObjectMessage omsg = (ObjectMessage) remsg;
6
7 tarJT.send(new MessageCreator() {
8 public Message createMessage(Session session)
9 throws JMSException {
10 Message result = session.createObjectMessage(omsg
11 .getObject());
12 return result;
13 }
14 });
15 }
16 return remsg != null;
17 }

事实上,尽管我在项目中使用了上述配置,但我并不了解其原理。还有2这个问题值得一提:
- 定时任务和事务之间的冲突
服务类是事务性的,如果它也是一个定时任务,就会出现问题。 - 事务和循环之间的冲突
最初计划在事务中使用循环souJT收到的消息是null作为结束条件,每个收据都会发送一个。因此,即使有更多消息,它们也只执行一次,无论Atomikos也不知道是什么原因造成的。随后,循环被放置在一个定时任务中,在循环体中调用业务方法,以根据返回的结果确定是否结束循环。
2.broker-B到数据库
从broker-B接收数据库中的消息并写入数据库。此时间未使用Atomikos,还实现了在写库失败时回滚消息的事务效果;broker-A到broker-B,如果未使用Atomikos无法实现交易效果。也许是因为它使用同步接收,然后我们使用异步接收。接收方式是否与分布式事务有关,结论尚不确定。
在这里,我们甚至不再使用它。spring的JtaTransactionManager。

1 <?xml version="1.0" encoding="UTF-8"?>
2 <beans xmlns="http://www.springframework.org/schema/beans"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xmlns:jms="http://www.springframework.org/schema/jms"
5 xsi:schemaLocation="http://www.springframework.org/schema/beans
6 http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
7 http://www.springframework.org/schema/jms
8 http://www.springframework.org/schema/jms/spring-jms-4.2.xsd">
9
13
15 <jms:listener-container connection-factory="connectionFactory"
16 concurrency="1" acknowledge="transacted">
17 <jms:listener destination="asj.log" ref="converteredLogListener"
18 method="onLog" />
19 </jms:listener-container>
20

当然,我们必须声明listener-container的acknowledge属性为"transacted"打开交易。在里面ConverteredLogListener的onLog,只要写入失败时抛出的异常可以到达,就调用服务组件将数据放入存储。onLog方法,则事务被回滚,因此确保可以抛出异常。
版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
itfan123


