Skip to content

RabbitMQ

RabbitMQ基础

RabbitMQ组件

  • Broker :一个RabbitMQ实例就是一个Broker
  • Virtual Host :虚拟主机。相当于MySQL的DataBase ,一个Broker上可以存在多个vhost,vhost之间相互隔离。每个vhost都拥有自己的队列、交换机、绑定和权限机制。vhost必须在连接时指定,默认的vhost是/。
  • Exchange :交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Queue :消息队列,用来保存消息直到发送给消费者。它是消息的容器。一个消息可投入一个或多个队列。
  • Banding :绑定关系,用于 消息队列和交换机之间的关联 。通过路由键( Routing Key )将交换机和消息队列关联起来。
  • Channel :管道,一条双向数据流通道。不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了管道的概念,以复用一条TCP连接。
  • Connection :生产者/消费者 与broker之间的TCP连接。
  • Publisher :消息的生产者。
  • Consumer :消息的消费者。
  • Message :消息,它是由消息头和消息体组成。消息头则包括 Routing-KeyPriority (优先级)等

image-20230426182140713

RabbitMQ通信模型

simple

生产者将消息发送到队列中,一个消费者一定能从队列中获取消息

Work Queue

多个消费者对应同一个队列,提高数据的消费能力。多个消费者绑定到同一个队列上,共同消费队列中的数据,队列中数据一旦消费了就自动删除,不会重复执行。 若多个消费者同时监听一个队列,默认情况下MQ平均分配消息

Publish/Subscribe

生产者发送消息给交换机(exchange)了,由交换机确定发送给具体队列。这个交换机用来接收生产者发送的数据,并将将其转发给对应的队列,根据交换机发送数据的不同方式,分成3种:

  • 1.Fanout:广播模式,将消息发送给所有的队列

  • 2.Direct:定向,把消息发送给 routingkey 相同的队列

队列与交换机不能任意绑定,需要指定一个Routingkey(路由key) 在消息的发送发向交换机发送消息是,也需要指定RoutingKey 交换机就会把消息发送给Routingkey一样的队列

  • 3.Topic:通配符,把消息发送给符合 routing pattern(路由模式)的队列

    routingkey可以采用通配符进行定义,一般都是有一个或多个单词组成,多个单词之间使用“.”分割

    • 通配符规则 .# : 匹配一个或多个单词 .* : 匹配一个单词
    • routing key 为一个句点号 "." 分隔的字符串。我们将被句点号"."分隔开的每一段独立的字符串称为一个单词,例如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit”
    • binding key 与 routing key 一样也是句点号 "." 分隔的字符串。
    • binding key 中可以存在两种特殊字符 "*""#",用于做模糊匹配。其中 "*" 用于匹配一个单词,"#" 用于匹配多个单词(可以是零个)。

RabbitMQ TTL

TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共2种。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

  • 在消息发送时进行指定 。通过配置消息体的 Properties ,可以指定当前消息的过期时间。
  • 在创建Exchange时指定 。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。

RabbitMQ DLX

死信队列(DLX Dead-Letter-Exchange):当消息在一个队列中变成死信之后,它会被重新推送到另一个队列,这个队列就是死信队列。DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。

死信队列(DLX)产生来源

①:消息被拒绝(basic.reject/basic.nack),且 requeue = false(代表不重新回到队列)
②:消息因TTL过期(就是任务消息上携带过期时间)
③:消息队列的消息数量已经超过最大队列长度,先入队的消息会被丢弃变为死信
①:消息被拒绝(basic.reject/basic.nack),且 requeue = false(代表不重新回到队列)
②:消息因TTL过期(就是任务消息上携带过期时间)
③:消息队列的消息数量已经超过最大队列长度,先入队的消息会被丢弃变为死信

image-20230426183056796

消息TTL过期产生死信

image-20230426183200529

死信交换机、死信队列需要我们自己创建,只是业务中用来存放死信的“特殊”交换机队列。其他队列可以指定死信交换机、死信队列,当发生死信时,自动将其投递到死信队列中。

RabbitMQ 消息持久化

交换机

交换机的持久化其实就是相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启RabbitMQ时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。

创建时,使用durable(true)设置持久化:

java
@Bean("bootExchange")
public Exchange bootExchange() {
    return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
}
@Bean("bootExchange")
public Exchange bootExchange() {
    return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
}

队列

队列持久化类似于交换机持久化,创建时,使用durable(" ")设置持久化:

java
@Bean("bootQueue")
public Queue bootQueue() {
    return QueueBuilder.durable("bootQueue").build();
}
@Bean("bootQueue")
public Queue bootQueue() {
    return QueueBuilder.durable("bootQueue").build();
}

消息

众所周知,RabbitMQ的消息是依附于队列存在的,所以想要消息持久化,那么前提是队列也要持久化。 RabbitTemplate发送消息默认就是持久化的

RabbitTemplate.convertAndSend()
RabbitTemplate.convertAndSend()

RabbitMQ对象序列化

使用RabbitMQ原生API,发送消息时,发送的是二进制byte[]数据。

使用RabbitTemplate.send方法发送Message对象,也是二进制byte[]数据。

在接收时,需要将二进制数据转为你想要的数据格式。在JAVA 编程中都是基于对象操作,一般消息都是对象,比如订单、日志。

所以RabbitTemplate提供了convertAndSend方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。

使用Jackson 序列化

只需要在RabbitTemplate 、监听容器工厂RabbitListenerContainerFactory中设置转换器即可。

java
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(new Jackson2JsonMessageConverter());
    return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);
    template.setMessageConverter(new Jackson2JsonMessageConverter());
    return template;
}

RabbitMQ Confirm机制

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接受应答,用来确认这条消息是否正常的发送到了Broker,这种方式也是 消息的可靠性投递的核心保障!

如何实现Confirm确认消息?

image-20230425153439925

  1. 在channel上开启确认模式channel.confirmSelect()
  2. 在channel上开启监听 :addConfirmListener ,监听成功和失败的处理结果,根据具体的结果对消息进行重新发送或记录日志处理等后续操作。

开启发布确认模式配置项publisher-confirm-type,该配置项是枚举,可以配置为NONESIMPLECORRELATED。[NONE 表示关闭确认回调,这是默认配置]

yaml
spring:
 rabbitmq:
   username: xxx
   password: xxx
   host: localhost
   port: 5672
   # 开启
   publisher-confirm-type: CORRELATED
   # springboot2.2.0,该属性已过时,通过publisher-confirm-type配置
   # publisher-confirms: true
spring:
 rabbitmq:
   username: xxx
   password: xxx
   host: localhost
   port: 5672
   # 开启
   publisher-confirm-type: CORRELATED
   # springboot2.2.0,该属性已过时,通过publisher-confirm-type配置
   # publisher-confirms: true

RabbitTemplate设置一个confirmCallback回调函数

java
       rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
           @Override
           public void confirm(CorrelationData correlationData, boolean b, String s) {
               System.out.println("进入回调方法");
               System.out.println("消息ID:" + correlationData.getId());
               // 如果开启了退回模式,还可以通过correlationData获取退回消息
               if (correlationData.getReturned() != null) {
                   System.out.println("消息被退回,原因:" + correlationData.getReturned().getReplyText());
               }
               if (b) {
                   System.out.println("确认接收到消息");
               } else {
                   System.out.println("失败,原因:" + s);
               }
           }
       });
       // 可以创建CorrelationData对象,设置id属性,用来表示当前消息的唯一性。
       // 确认时,可以获取ID,知道是哪个消息
       CorrelationData correlationData = new CorrelationData("34424343");
       rabbitTemplate.convertAndSend("bootExchange", "boot222.key", "HELLO SPRING BOOT", correlationData);
       rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
           @Override
           public void confirm(CorrelationData correlationData, boolean b, String s) {
               System.out.println("进入回调方法");
               System.out.println("消息ID:" + correlationData.getId());
               // 如果开启了退回模式,还可以通过correlationData获取退回消息
               if (correlationData.getReturned() != null) {
                   System.out.println("消息被退回,原因:" + correlationData.getReturned().getReplyText());
               }
               if (b) {
                   System.out.println("确认接收到消息");
               } else {
                   System.out.println("失败,原因:" + s);
               }
           }
       });
       // 可以创建CorrelationData对象,设置id属性,用来表示当前消息的唯一性。
       // 确认时,可以获取ID,知道是哪个消息
       CorrelationData correlationData = new CorrelationData("34424343");
       rabbitTemplate.convertAndSend("bootExchange", "boot222.key", "HELLO SPRING BOOT", correlationData);

RabbitMQ Return机制

默认情况下,如果消息没有匹配到队列会直接丢弃,采用退回模式可以在生产者端监听改消息是否被成功投递到队列中。

我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。

但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener

在配置文件中开启退回模式:

yaml
spring:
 rabbitmq:
   username: xxx
   password: xxx
   host: localhost
   port: 5672
   # 开启回退模式
   publisher-returns: true
spring:
 rabbitmq:
   username: xxx
   password: xxx
   host: localhost
   port: 5672
   # 开启回退模式
   publisher-returns: true

基础API中有个关键的配置项 Mandatory :如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。

同样,通过监听的方式, chennel.addReturnListener(ReturnListener rl) 传入已经重写过handleReturn方法的ReturnListener。

RabbitTemplate设置一个returnsCallback回调函数,如果消息路由键没有匹配的队列,投递出去后将被退回,执行回调函数。

java
       // 回退模式回调函数
       rabbitTemplate.setReturnsCallback(returnedMessage -> {
           System.out.println("消息被回退,原因:"+returnedMessage.getReplyText());
           System.out.println(returnedMessage.getMessage()); // 回退消息
           System.out.println(  returnedMessage.getExchange()); // 交换机
           System.out.println(returnedMessage.getReplyCode()); // 返回原因的代码
           System.out.println(returnedMessage.getReplyText()); // 返回信息,例如NO_ROUTE
           System.out.println(returnedMessage.getRoutingKey()); // 路由KEY
       });
       // 路由键aaa.bbb.key,没有匹配的队列
       rabbitTemplate.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");
       // 回退模式回调函数
       rabbitTemplate.setReturnsCallback(returnedMessage -> {
           System.out.println("消息被回退,原因:"+returnedMessage.getReplyText());
           System.out.println(returnedMessage.getMessage()); // 回退消息
           System.out.println(  returnedMessage.getExchange()); // 交换机
           System.out.println(returnedMessage.getReplyCode()); // 返回原因的代码
           System.out.println(returnedMessage.getReplyText()); // 返回信息,例如NO_ROUTE
           System.out.println(returnedMessage.getRoutingKey()); // 路由KEY
       });
       // 路由键aaa.bbb.key,没有匹配的队列
       rabbitTemplate.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");

RabbitMQ ACK与NACK

消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。但是对于服务器宕机等严重问题,我们需要 手动ACK 保障消费端消费成功。

java
// deliveryTag:消息在mq中的唯一标识  
// multiple:是否批量(和qos设置类似的参数)  
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。  
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
// deliveryTag:消息在mq中的唯一标识  
// multiple:是否批量(和qos设置类似的参数)  
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。  
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)

消息在 消费端重回队列 是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列( 避免进入死循环 ),也就是设置为false。

java
@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"bootQueue"})
    public void receiveMessage(Message message, Channel channel) throws IOException {
        // 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("收到消息===" + new String(message.getBody()));
            System.out.println("处理业务逻辑");
            // 发生异常
            // int i = 5 / 0;
            // 处理完成后,确认
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 发生异常,拒绝签收
            e.printStackTrace();
            channel.basicNack(deliveryTag, true, true);
        }
    }
}
@Component
public class RabbitConsumer {

    @RabbitListener(queues = {"bootQueue"})
    public void receiveMessage(Message message, Channel channel) throws IOException {
        // 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("收到消息===" + new String(message.getBody()));
            System.out.println("处理业务逻辑");
            // 发生异常
            // int i = 5 / 0;
            // 处理完成后,确认
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 发生异常,拒绝签收
            e.printStackTrace();
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

RabbitMQ高可用

消息顺序消费

如何保证 RabbitMQ 消息的顺序消费?

思路就是拆分queue,使得一个queue只对应一个消费者,这样消费者一定是按照顺序消费的;

如果消息数量较大,那我们可以按照消息类型拆分队列,你管你消息再多,不可能所有消息都是需要顺序性消费的吧,我们可以灵活一点,视情况而定,比如某三个消息ABC的对应的操作是需要顺序消费的,那就把这三个放到同一个队列;如果有多组这样的ABC消息需要保证顺序,那我们就多搞几个队列;不需要保证顺序的消息就放在其它队列;

消息重复消费

如何保证 RabbitMQ 消息不会重复消费?

这个要分情况:

  • 如果是direct模式:

    一个队列对应一个消费者,那不存在重复消费的问题;如果是一个队列对应多个消费者,那消费者会通过轮询来消费,也不会存在重复消费的问题;

  • 如果是topic或者广播模式:

    一个队列对应了多个消费者,且消费者会同时收到消息,那就会出现重复消费的问题,如果我们不希望出现重复消费,我们可以给消息加一个唯一id,存到redis里面,消息消费成功后就存到redis里面去,这里我们可以用redis的set类型,然后每次消费之前先看看redis里面有没有该id;

RabbitMQ使用总结

RabbitMQ 控制台

Exchanges

image-20230426182711392

Queues

image-20230426182327281

Add a new queue

image-20230426182405029

RabbitMQ 示例

xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml
spring:
rabbitmq:
host: 120.26.184.26
username: admin
password: 123456
virtual-host: /test
port: 5672
listener:
simple:
acknowledge-mode: manual #手动签收消息
publisher-confirms: true #开启confirm机制
publisher-returns: true  #开启return机制
spring:
rabbitmq:
host: 120.26.184.26
username: admin
password: 123456
virtual-host: /test
port: 5672
listener:
simple:
acknowledge-mode: manual #手动签收消息
publisher-confirms: true #开启confirm机制
publisher-returns: true  #开启return机制
java
/**
* MqConfig
*/
@Configuration
public class MqConfig {
  //1.创建交换机
  @Bean
  public TopicExchange topicExchange() {
      TopicExchange topicExchange = new TopicExchange("springboot?topic-exchange", true, false);
      return topicExchange;
  }
  //2.创建队列
  @Bean
  public Queue queue(){
      //第二个参数为true表示队列支持持久化
      Queue queue = new Queue("springboot-queue", true, false,false, null);
      return queue;
  }
  //3.将交换机和队列进行绑定
  @Bean
  public Binding binding(TopicExchange exchange, Queue queue){
      return BindingBuilder.bind(queue).to(exchange).with("*.red.*");
  }
}
/**
* MqConfig
*/
@Configuration
public class MqConfig {
  //1.创建交换机
  @Bean
  public TopicExchange topicExchange() {
      TopicExchange topicExchange = new TopicExchange("springboot?topic-exchange", true, false);
      return topicExchange;
  }
  //2.创建队列
  @Bean
  public Queue queue(){
      //第二个参数为true表示队列支持持久化
      Queue queue = new Queue("springboot-queue", true, false,false, null);
      return queue;
  }
  //3.将交换机和队列进行绑定
  @Bean
  public Binding binding(TopicExchange exchange, Queue queue){
      return BindingBuilder.bind(queue).to(exchange).with("*.red.*");
  }
}
java
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MqApplication.class)
public class Publisher {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Test
  public void publisher() {
      //发送消息
      rabbitTemplate.convertAndSend("springboot-topic.exchange","yangguang.red.dog","阳光的红色的大黄狗");
  }
}
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MqApplication.class)
public class Publisher {
  @Autowired
  private RabbitTemplate rabbitTemplate;
  @Test
  public void publisher() {
      //发送消息
      rabbitTemplate.convertAndSend("springboot-topic.exchange","yangguang.red.dog","阳光的红色的大黄狗");
  }
}
java
@Component
public class Consumer {
  //定义消费者,并监听队列
  @RabbitListener(queues = "springboot-queue")
  public void getMessage(String msg, Channel channel, Messagemessage) throws IOException {
      System.out.println("接收的数据为:" + msg);
      try {
          //业务逻辑
          int i=1/0; //模拟异常
          //手动签收
          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      } catch (Exception e) {
          //让消息重回队列
          //参数1:消息的标记
          //参数2:是否进行批量应答
          //参数3:拒绝接收消息之后,这个消息是否继续会队列
          channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true );
      }
  }
}
@Component
public class Consumer {
  //定义消费者,并监听队列
  @RabbitListener(queues = "springboot-queue")
  public void getMessage(String msg, Channel channel, Messagemessage) throws IOException {
      System.out.println("接收的数据为:" + msg);
      try {
          //业务逻辑
          int i=1/0; //模拟异常
          //手动签收
          channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
      } catch (Exception e) {
          //让消息重回队列
          //参数1:消息的标记
          //参数2:是否进行批量应答
          //参数3:拒绝接收消息之后,这个消息是否继续会队列
          channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true );
      }
  }
}

RabbitMQ 延时插件

基于 过期消息实现延迟队列、过期队列实现延迟队列 存在一些问题,官网提供了基于插件的方式来实现。

消息到达延迟交换机后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。

Docker环境安装

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

shell
#拷贝到rabbitmq容器 b6c96d8d8e6f 中
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez ac88d0cb6bfc:/plugins
#进入容器
docker exec -it ac88d0cb6bfc /bin/bash
#启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#查看
rabbitmq-plugins list
#重新启动容器
docker restart ac88d0cb6bfc
#拷贝到rabbitmq容器 b6c96d8d8e6f 中
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez ac88d0cb6bfc:/plugins
#进入容器
docker exec -it ac88d0cb6bfc /bin/bash
#启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#查看
rabbitmq-plugins list
#重新启动容器
docker restart ac88d0cb6bfc

代码实现

首先创建x-delayed-message类型的交换机,并绑定队列:

java
@Configuration
public class RabbitMqDelayQueueConfig {

    @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable("delayQueue").build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
    }
}
@Configuration
public class RabbitMqDelayQueueConfig {

    @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable("delayQueue").build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
    }
}

创建消费者,监听延迟队列:

java
    @RabbitListener(queues = {"delayQueue"})
    public void deadQueue(Message message) {
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }
    @RabbitListener(queues = {"delayQueue"})
    public void deadQueue(Message message) {
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }

创建生产者,发送不同延迟时间的消息:

java
        // 发送延迟时间为20秒的消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(20000); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送延迟时间为10秒的消息
        messageProperties.setDelay(10000); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);
        // 发送延迟时间为20秒的消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(20000); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送延迟时间为10秒的消息
        messageProperties.setDelay(10000); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);

Last updated: