ActiveMQ安装操作与spring整合使用教程版权声明

原创
小哥 3年前 (2022-11-11) 阅读数 87 #大杂烩

1      ActiveMQ介绍

1.1    什么是ActiveMQ
ActiveMQ是Apache生产,最流行、功能强大的开源消息总线。ActiveMQ 是完全支持的JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS自从条例颁布以来,已经有很长一段时间了。,但是JMS在当今的J2EE应用程序仍然扮演着特殊的角色。

主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

  3. 对Spring的支持,ActiveMQ可轻松嵌入使用Spring系统内部,但也支持Spring2.0的特性

  4. 通过共同点J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resourceadaptors的配置,可以让ActiveMQ可以自动部署到任何兼容的J2EE1.4 在商用服务器上

  5. 支持多种传输协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  6. 支持通过JDBC和journal提供高速消息持久性。

  7. 旨在确保高性能集群,客户端-服务器,点对点

  8. 支持Ajax

  9. 支持与Axis的整合

  10. 很容易调用内联JMS provider,进行测试

1.2    JMS介绍
JMS的全称是Java Message Service,即Java消息服务。用于在两个应用程序之间或分布式系统中发送消息,用于异步通信。

它主要用于生产者和消费者之间的消息传递,生产者和消费者负责生成消息,消费者负责接收消息。如果我们将其应用于实际的业务需求,我们可以使用生产者在特定时间生成消息并发送它。相应的消费者将在收到相应的消息后完成相应的业务逻辑。

有两种类型的消息传递:

一种是点对点,即生产者和消费者之间的一对一通信;

另一个是发布/订阅模式,即生产者生成消息并发送消息后,可以由多个消费者接收。

JMS定义了五种不同的消息体格式,以及调用的消息类型,允许您以某些不同的形式发送和接收数据,从而与现有消息格式提供一定程度的兼容性。

· StreamMessage -- Java原始值的数据流

· MapMessage--一套名称-值对

· TextMessage--字符串对象

· ObjectMessage--A序列化 Java对象

· BytesMessage--数据流的一个字节

2      ActiveMQ的安装
2.1    下载
进入http://activemq.apache.org/下载ActiveMQ

单击下载:

收集的文件和linux下面的安装包

Windows下面的安装包

2.2    安装
安装步骤:

步骤1:安装jdk,需要jdk1.7以上版本

第2步:解压缩activeMQ压缩包。

第3步:输入bin目录。

启动:[root@localhost bin]# ./activemq start

停止:[root@localhost bin]# ./activemq stop

注:如果ActiveMQ整合spring使用不使用activemq-all-5.12.0.jar包裹建议使用5.11.2

步骤4:访问后台管理。

http://ip地址:8161/admin

用户名:admin

密码:admin

503错误解决:

1,查看计算机名称

[root@itcast168 bin]# cat/etc/sysconfig/network

NETWORKING=yes

HOSTNAME=itcast168

2、修改host文件

[root@itcast168 bin]# cat /etc/hosts

127.0.0.1  localhost localhost.localdomain localhost4 localhost4.localdomain4itcast168

::1        localhost localhost.localdomain localhost6 localhost6.localdomain6

[root@itcast168 bin]#

3、重启Activemq服务

3      ActiveMQ使用方法
3.1    JMS消息发送模式

在对等或队列模型下,生产者将消息发布到特定队列,消费者从中读取消息。在这里,生产者知道消费者的队列,并将消息直接发送到消费者的队列。这种模式被总结为:只有一个消费者会收到消息。生产者不需要在接收者使用消息期间处于运行状态,接收者也不需要在发送消息时处于运行状态。每个成功处理的邮件都由收件人签名。

发布者/订阅者模型支持将消息发布到特定的消息主题。0或者,多个订户可能对接收来自特定消息主题的消息感兴趣。在这种模式下,出版商和订阅者彼此不认识。这个模型就像一个匿名的公告板。这种模式概括为:多个消费者可以获得消息。.发布者和订阅者之间存在时间依赖关系。发布者需要建立订阅(subscription)以便客户可以购买订阅。除非订户建立持久订阅,否则订户必须保持持续活动状态才能接收消息。在这种情况下,订户未连接时发布的消息将在订户重新连接时重新发布。

3.2    JMS应用程序界面
ConnectionFactory 接口(连接工厂)

用户用于创建。JMS提供程序连接的托管对象。JMS客户通过可移植接口访问连接,以便在底层实现更改时不需要修改代码。中的管理员JNDI在命名空间中配置连接工厂,JMS只有这样,客户才能找到它们。根据消息类型,用户将使用队列连接工厂或主题连接工厂。

Connection 接口(连接)

连接表示应用程序和消息服务器之间的通信链路。一旦您有了连接工厂,就可以创建连接。JMS提供商的连接。根据连接的类型,连接允许用户创建会话,以便向目标发送和接收队列和主题。

Destination 接口(目标)

目标是包装消息目标标识符的托管对象,消息目标是指发布和接收消息的位置、队列或主题。JMS管理员创建这些对象,然后用户通过。JNDI发现它们。与连接工厂一样,管理员可以创建两种类型的目标:对等模型的队列和发布者/订阅者模型的主题。

MessageConsumer 接口(消息使用者)

会话创建的对象,用于接收发送到目标的消息。消费者可以同步(阻塞模式)或异步(非阻塞)接收队列和主题类型的消息。

MessageProducer 接口(消息生成器)

会话创建的用于向目标发送消息的对象。用户可以为目标创建发送方,或者在发送消息时指定目标的通用发送方。

Message 接口(消息)

它是一个在消费者和生产者之间转移的对象,即从一个应用程序转移到另一个应用。消息有三个主要部分:

消息头(必需):包含用于识别和查找消息路由的操作设置。

一组消息属性(可选):包含其他属性以支持与其他提供者和用户的兼容性。您可以创建自定义字段和过滤器(消息选择器)。

一个消息体(可选):允许用户创建五种类型的消息(文本消息、映射消息、字节消息、流消息和对象消息)。

消息界面非常灵活,提供了许多自定义消息内容的方法。

Session 接口(会话)

表示用于发送和接收消息的单线程上下文。由于会话是单线程的,所以消息是连续的,也就是说,消息是按照发送的顺序逐个接收的。会话的优点是它支持事务。如果用户选择事务支持,会话上下文将保存一组消息,直到事务提交。用户可以使用回滚操作在提交事务之前取消这些消息。会话允许用户创建用于发送消息的消息生产者和用于接收消息的消息消费者。

3.3    消息队列
把ActiveMQ依赖的jar将包添加到项目中。

activemq-all-5.12.0.jar

使用maven项目,然后添加jar包的依赖关系:

org.apache.activemq activemq-all 5.11.2

3.3.1  Producer
步骤1:创建ConnectionFactory对象,您需要指定服务端。ip和端口号。

2.使用ConnectionFactory对象创建Connection对象。

步骤3:打开连接并呼叫Connection对象的start方法。

4.使用Connection对象创建Session对象。

第5步:使用Session对象创建Destination对象(topic、queue),在此处创建Queue对象。

6.使用Session对象创建Producer对象。

步骤7:创建Message对象,创建TextMessage对象。

第8步:使用Producer对象发送消息。

第9步:关闭资源。

@Test
public void testQueueProducer() throws Exception {
// 步骤1:创建ConnectionFactory对象,您需要指定服务端。ip和端口号。
//brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 2.使用ConnectionFactory对象创建Connection对象。
Connection connection = connectionFactory.createConnection();
// 步骤3:打开连接并呼叫Connection对象的start方法。
connection.start();
// 4.使用Connection对象创建Session对象。
//第一个参数:是否打开交易。true:打开事务,忽略第二个参数。
//第二个参数:当第一个参数为时。时。false这很有道理。消息的响应模式。1,自动响应2,手动响应。这通常是一种自动反应。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第5步:使用Session对象创建Destination对象(topic、queue),在此处创建Queue对象。
//参数:队列的名称。
Queue queue = session.createQueue("test-queue");
// 6.使用Session对象创建Producer对象。
MessageProducer producer = session.createProducer(queue);
// 步骤7:创建Message对象,创建TextMessage对象。
/TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");
/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第8步:使用Producer对象发送消息。
producer.send(textMessage);
// 第9步:关闭资源。
producer.close();
session.close();
connection.close();
}
3.3.2  Consumer
消费者有两种消费方式:

1,同步消耗。通过拨打消费者的receive方法从目标显式提取消息。receive方法可以被阻止,直到消息到达。

2,异步消耗。客户可以为消费者注册一个消息侦听器,以定义消息到达时所采取的操作。

实现MessageListener接口,在MessageListener消息的处理逻辑在()方法中实现。

3.3.2.1 同步消费
publicclass QueueConsumer {

publicstaticvoidmain(String[]args) {
//创建连接工厂
ConnectionFactory connectionFactory  = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
try {
//创建连接
Connection connection = connectionFactory.createConnection();
//打开连接
connection.start();
//创建答复
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地Destination
Queue queue = session.createQueue("mytestqueue");
//创建消费者
MessageConsumer consumer = session.createConsumer(queue);
while(true) {
//设置收件人接收消息的时间,此处设置此时间是为了便于测试。100s
Messagemessage =consumer.receive(100000);
if (message !=null) {
System.out.println(message);
}else {
//超时结束
break;
}
}
consumer.close();
session.close();
connection.close();
} catch (Exceptione) {
e.printStackTrace();
}

}
}
3.3.2.2 异步消费
消费者:接收消息。

步骤1:创建一个ConnectionFactory对象。

第2步:从ConnectionFactory要获取的对象Connection对象。

步骤3:打开连接。呼叫Connection对象的start方法。

4.使用Connection对象创建Session对象。

第5步:使用Session对象创建Destination对象与发送端一致queue队列的名称相同。

6.使用Session对象创建Consumer对象。

步骤7:接收消息。

步骤8:打印消息。

第9步:关闭资源

@Test
public void testQueueConsumer() throws Exception {
// 步骤1:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第2步:从ConnectionFactory要获取的对象Connection对象。
Connection connection = connectionFactory.createConnection();
// 步骤3:打开连接。呼叫Connection对象的start方法。
connection.start();
// 4.使用Connection对象创建Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第5步:使用Session对象创建Destination对象与发送端一致queue队列的名称相同。
Queue queue = session.createQueue("test-queue");
// 6.使用Session对象创建Consumer对象。
MessageConsumer consumer = session.createConsumer(queue);
// 步骤7:接收消息。
consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//记录邮件内容
text = textMessage.getText();
// 步骤8:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待键盘输入
System.in.read();
// 第9步:关闭资源
consumer.close();
session.close();
connection.close();
}
3.4.Topic
3.4.1  Producer
Prod使用步骤:

步骤1:创建ConnectionFactory对象,您需要指定服务端。ip和端口号。

2.使用ConnectionFactory对象创建Connection对象。

步骤3:打开连接并呼叫Connection对象的start方法。

4.使用Connection对象创建Session对象。

第5步:使用Session对象创建Destination对象(topic、queue),在此处创建Topic对象。

6.使用Session对象创建Producer对象。

步骤7:创建Message对象,创建TextMessage对象。

第8步:使用Producer对象发送消息。

第9步:关闭资源。

@Test
public void testTopicProducer() throws Exception {
// 步骤1:创建ConnectionFactory对象,您需要指定服务端。ip和端口号。
// brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 2.使用ConnectionFactory对象创建Connection对象。
Connection connection = connectionFactory.createConnection();
// 步骤3:打开连接并呼叫Connection对象的start方法。
connection.start();
// 4.使用Connection对象创建Session对象。
// 第一个参数:是否打开交易。true:打开事务,忽略第二个参数。
// 第二个参数:当第一个参数为时。时。false这很有道理。消息的响应模式。1,自动响应2,手动响应。这通常是一种自动反应。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第5步:使用Session对象创建Destination对象(topic、queue),在此处创建topic对象。
// 参数:主题的名称。
Topic topic = session.createTopic("test-topic");
// 6.使用Session对象创建Producer对象。
MessageProducer producer = session.createProducer(topic);
// 步骤7:创建Message对象,创建TextMessage对象。
/*

  • TextMessage message = new ActiveMQTextMessage(); message.setText(
  • "hello activeMq,this is my first test.");
    */
    TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
    // 第8步:使用Producer对象发送消息。
    producer.send(textMessage);
    // 第9步:关闭资源。
    producer.close();
    session.close();
    connection.close();
    }
    3.4.2  Consumer
    消费者:接收消息。

步骤1:创建一个ConnectionFactory对象。

第2步:从ConnectionFactory要获取的对象Connection对象。

步骤3:打开连接。呼叫Connection对象的start方法。

4.使用Connection对象创建Session对象。

第5步:使用Session对象创建Destination对象与发送端一致topic这个主题有相同的名字。

6.使用Session对象创建Consumer对象。

步骤7:接收消息。

步骤8:打印消息。

第9步:关闭资源

@Test
public void testTopicConsumer() throws Exception {
// 步骤1:创建一个ConnectionFactory对象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第2步:从ConnectionFactory要获取的对象Connection对象。
Connection connection = connectionFactory.createConnection();
// 步骤3:打开连接。呼叫Connection对象的start方法。
connection.start();
// 4.使用Connection对象创建Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第5步:使用Session对象创建Destination对象与发送端一致topic这个主题有相同的名字。
Topic topic = session.createTopic("test-topic");
// 6.使用Session对象创建Consumer对象。
MessageConsumer consumer = session.createConsumer(topic);
// 步骤7:接收消息。
consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
// 记录邮件内容
text = textMessage.getText();
// 步骤8:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic的消费端03.....");
// 等待键盘输入
System.in.read();
// 第9步:关闭资源
consumer.close();
session.close();
connection.close();
}
4      ActiveMQ整合Spring
4.1    配置ConnectionFactory
步骤1:引用相关jar包。

org.springframework spring-jms org.springframework spring-context-support

步骤2:配置Activemq整合spring。配置ConnectionFactory

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

步骤3:配置生产者。 使用JMSTemplate对象发送消息。 第4步:进入spring容器中的配置Destination。 spring-queue 步骤5:代码测试 发送消息 步骤1:初始化spring容器 第2步:从容器中获得JMSTemplate对象。 第3步:从容器中取出一个。Destination对象 4.使用JMSTemplate对象发送消息并需要知道Destination @Test public void testQueueProducer() throws Exception { // 步骤1:初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第2步:从容器中获得JMSTemplate对象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第3步:从容器中取出一个。Destination对象 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 4.使用JMSTemplate对象发送消息并需要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq test"); return textMessage; } }); } 接收消息 Taotao-search-Service在中接收消息。 第一步:放Activemq相关的jar包已添加到项目 步骤2:创建MessageListener实现类的。 public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //获取消息内容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } } 步骤3:配置spring和Activemq整合 spring-queue 步骤4:测试代码。 @Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); } 应用案例 1.  添加商品同步索引库 1.1. Producer Taotao-manager-server在项目中发送消息。 添加货物时发送一个TextMessage,包含商品id。 @Override public TaotaoResult addItem(TbItem item, String desc) { // 1,生成商品id final long itemId = IDUtils.genItemId(); // 2、补全TbItem对象的属性 item.setId(itemId); //商品状态,1-正常,2-下架,3-删除 item.setStatus((byte) 1); Date date = new Date(); item.setCreated(date); item.setUpdated(date); // 3,将数据插入商品表 itemMapper.insert(item); // 4,创建TbItemDesc对象 TbItemDesc itemDesc = new TbItemDesc(); // 5、补全TbItemDesc的属性 itemDesc.setItemId(itemId); itemDesc.setItemDesc(desc); itemDesc.setCreated(date); itemDesc.setUpdated(date); // 6,将数据插入商品描述表 itemDescMapper.insert(itemDesc); //发送项目添加消息 jmsTemplate.send(topicDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(itemId + ""); return textMessage; } }); // 7、TaotaoResult.ok() return TaotaoResult.ok(); } 1.2. Consumer 1.2.1.   功能分析 1,接收消息。需要创建MessageListener接口实现类的。 2,接受信息,接受货物id。 3,根据商品id查询数据库。 4、创建一SolrInputDocument对象。 5、使用SolrServer对象将写入索引库。 6,返回成功,返回TaotaoResult。 1.2.2.   Dao层 根据商品id查询产品信息。 映射文件: 1.2.3.   Service层 参数:商品ID 业务逻辑: 1,根据商品id查询产品信息。 2、创建一SolrInputDocument对象。 3、使用SolrServer对象将写入索引库。 4,返回成功,返回TaotaoResult。 返回值:TaotaoResult public TaotaoResult addDocument(long itemId) throws Exception { // 1,根据商品id查询产品信息。 SearchItem searchItem = searchItemMapper.getItemById(itemId); // 2、创建一SolrInputDocument对象。 SolrInputDocument document = new SolrInputDocument(); // 3、使用SolrServer对象将写入索引库。 document.addField("id", searchItem.getId()); document.addField("item\_title", searchItem.getTitle()); document.addField("item\_sell\_point", searchItem.getSell\_point()); document.addField("item\_price", searchItem.getPrice()); document.addField("item\_image", searchItem.getImage()); document.addField("item\_category\_name", searchItem.getCategory\_name()); document.addField("item\_desc", searchItem.getItem\_desc()); // 5,将文档添加到索引库。 solrServer.add(document); solrServer.commit(); // 4,返回成功,返回TaotaoResult。 return TaotaoResult.ok(); } 1.2.4.   Listener public class ItemChangeListener implements MessageListener { @Autowired private SearchItemServiceImpl searchItemServiceImpl; @Override public void onMessage(Message message) { try { TextMessage textMessage = null; Long itemId = null; //取商品id if (message instanceof TextMessage) { textMessage = (TextMessage) message; itemId = Long.parseLong(textMessage.getText()); } //将文档添加到索引库。 searchItemServiceImpl.addDocument(itemId); } catch (Exception e) { e.printStackTrace(); } } } 1.2.5.   Spring配置监听 --------------------- 作者:MC-闰土 来源:CSDN 原文:https://blog.csdn.net/qq\_22075041/article/details/77602996 版权声明:本文为博主原创文章。转载请附上博客链接!
版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除