Skip to content

rabbitmq生产实践

RabbitMQ 控制台

Exchanges

image-20230426182711392

Queues

image-20230426182327281

Add a new queue

image-20230426182405029

RabbitMQ 示例

xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yaml
spring:
rabbitmq:
host: 120.26.184.26
username: admin
password: 123456
virtual-host: /test
port: 5672
listener:
simple:
acknowledge-mode: manual #手动签收消息
publisher-confirms: true #开启confirm机制
publisher-returns: true  #开启return机制
java
/**
* MqConfig
*/
@Configuration
public class MqConfig {
 //1.创建交换机
 @Bean
 public TopicExchange topicExchange() {
     TopicExchange topicExchange = new TopicExchange("springboot?topic-exchange", true, false);
     return topicExchange;
 }
 //2.创建队列
 @Bean
 public Queue queue(){
     //第二个参数为true表示队列支持持久化
     Queue queue = new Queue("springboot-queue", true, false,false, null);
     return queue;
 }
 //3.将交换机和队列进行绑定
 @Bean
 public Binding binding(TopicExchange exchange, Queue queue){
     return BindingBuilder.bind(queue).to(exchange).with("*.red.*");
 }
}
java
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = MqApplication.class)
public class Publisher {
 @Autowired
 private RabbitTemplate rabbitTemplate;
 @Test
 public void publisher() {
     //发送消息
     rabbitTemplate.convertAndSend("springboot-topic.exchange","yangguang.red.dog","阳光的红色的大黄狗");
 }
}
java
@Component
public class Consumer {
 //定义消费者,并监听队列
 @RabbitListener(queues = "springboot-queue")
 public void getMessage(String msg, Channel channel, Messagemessage) throws IOException {
     System.out.println("接收的数据为:" + msg);
     try {
         //业务逻辑
         int i=1/0; //模拟异常
         //手动签收
         channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
     } catch (Exception e) {
         //让消息重回队列
         //参数1:消息的标记
         //参数2:是否进行批量应答
         //参数3:拒绝接收消息之后,这个消息是否继续会队列
         channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true );
     }
 }
}

RabbitMQ 延时插件

基于 过期消息实现延迟队列、过期队列实现延迟队列 存在一些问题,官网提供了基于插件的方式来实现。

消息到达延迟交换机后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。

Docker环境安装

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

shell
#拷贝到rabbitmq容器 b6c96d8d8e6f 中
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez ac88d0cb6bfc:/plugins
#进入容器
docker exec -it ac88d0cb6bfc /bin/bash
#启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#查看
rabbitmq-plugins list
#重新启动容器
docker restart ac88d0cb6bfc

代码实现

首先创建x-delayed-message类型的交换机,并绑定队列:

java
@Configuration
public class RabbitMqDelayQueueConfig {

    @Bean("delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
    }

    /**
     * 创建队列:
     * 指定死信交换机、路由KEY
     */
    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable("delayQueue").build();
    }

    /**
     * 创建绑定关系
     */
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
    }
}

创建消费者,监听延迟队列:

java
    @RabbitListener(queues = {"delayQueue"})
    public void deadQueue(Message message) {
        System.out.println("收到消息" + new String(message.getBody()));
        System.out.println("当前时间:"+ LocalDateTime.now());
        System.out.println("判断订单状态...." + new String(message.getBody()));
        System.out.println("未支付,回滚数据库....");
    }

创建生产者,发送不同延迟时间的消息:

java
        // 发送延迟时间为20秒的消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDelay(20000); // 20秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message001);
        // 发送延迟时间为10秒的消息
        messageProperties.setDelay(10000); // 10秒过期
        System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
        Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
        rabbitTemplate.send("delayExchange", "delay.key", message002);