RabbitMQ(2)
原创目录
交换机
在RabbitMQ中,生产者发送信息不会直接将消息投递到队列中,而是将消息投递到交换机中,再由交换机转发到具体的队列中,队列再将消息以推送或者拉取方式给消费进行消费。
交换机类型:
- direct直接类型;
- topic主题类型;
- headers标题类型; 不常用
- fanout扇出类型。
无名类型:使用空字符串来指定
fanout交换机
生产者
public class Producer {
    public static final String QUEUE="hello";
    public static void main(String args[]) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        String message="hello";
        channel.basicPublish("exchange2","",null,message.getBytes());
        System.out.println("发送成功");
    }
}消费者
public class Consumer {
    public static void main(String args[]) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare("exchange2","fanout");
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,"exchange2","");
        DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        };
        channel.basicConsume(queue,true,deliverCallback,consumerTag->{});
    }
}Direct Exchange 直连交换其
这里整合Springboot代码如下:
application.yml文件
spring:
  rabbitmq:
    username: admin
    password: admin
    host: 182.92.167.13
    port: 5672Configuration配置类
@Configuration
public class DirectExchange1 {
    @Bean
    public Queue queue(){
        return new Queue("queue2",true);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange("exchange3",true,false);
    }
    @Bean
    Binding binding(){
        Binding binding= BindingBuilder.bind(queue()).to(directExchange()).with("123");
        return binding;
    }
}生产者
 rabbitTemplate.convertAndSend("exchange3","123","message");消费者
@Component
@RabbitListener(queues = "queue2")
public class rabbitmq {
    @RabbitHandler
    public void consume(String message){
        System.out.println(message);
    }
}Topic Exchange代码省略
死信队列
死信,就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信的来源
- 
消息 TTL(存活时间) 过期 
- 
队列达到最大长度(队列满了,无法再添加数据到队列中) 
- 
消息被拒绝并且(不放回队列中: requeue=false) 
生产者
 MessageProperties messageProperties=new MessageProperties();
        messageProperties.setExpiration("20000");
        Message message1=new Message("hello".getBytes(),messageProperties);
        rabbitTemplate.convertAndSend("exchange4","2.1",message1);Configuration配置
@Configuration
public class RabbitmqConfig {
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("exchange4");
    }
    @Bean
    public Queue queue(){
        Map map=new HashMap<>();
        map.put("x-dead-letter-exchange","exchange4_1");
        map.put("x-dead-letter-routing-key","error.er");
        return new Queue("queue3",true,false,false,map);
    }
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(topicExchange()).with("2.*");
    }
    @Bean
    public TopicExchange topicExchange1(){
        return new TopicExchange("exchange4_1");
    }
    @Bean
    public Queue queue1(){
        return new Queue("queue3_1");
    }
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(topicExchange1()).with("error.#");
    }
} 消费者
@Component
public class rabbitmq {
    @RabbitListener(queues = "queue3_1")
    public void consume(String msg, Message message, Channel channel){
       System.out.println(msg);
    }
}版权声明
所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除
 itfan123
itfan123








