rabbitmq生产实践
RabbitMQ 控制台
Exchanges
Queues
Add a new queue
RabbitMQ 示例
xml<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yamlspring: rabbitmq: host: 120.26.184.26 username: admin password: 123456 virtual-host: /test port: 5672 listener: simple: acknowledge-mode: manual #手动签收消息 publisher-confirms: true #开启confirm机制 publisher-returns: true #开启return机制
java/** * MqConfig */ @Configuration public class MqConfig { //1.创建交换机 @Bean public TopicExchange topicExchange() { TopicExchange topicExchange = new TopicExchange("springboot?topic-exchange", true, false); return topicExchange; } //2.创建队列 @Bean public Queue queue(){ //第二个参数为true表示队列支持持久化 Queue queue = new Queue("springboot-queue", true, false,false, null); return queue; } //3.将交换机和队列进行绑定 @Bean public Binding binding(TopicExchange exchange, Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("*.red.*"); } }
java@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest(classes = MqApplication.class) public class Publisher { @Autowired private RabbitTemplate rabbitTemplate; @Test public void publisher() { //发送消息 rabbitTemplate.convertAndSend("springboot-topic.exchange","yangguang.red.dog","阳光的红色的大黄狗"); } }
java@Component public class Consumer { //定义消费者,并监听队列 @RabbitListener(queues = "springboot-queue") public void getMessage(String msg, Channel channel, Messagemessage) throws IOException { System.out.println("接收的数据为:" + msg); try { //业务逻辑 int i=1/0; //模拟异常 //手动签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (Exception e) { //让消息重回队列 //参数1:消息的标记 //参数2:是否进行批量应答 //参数3:拒绝接收消息之后,这个消息是否继续会队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true ); } } }
RabbitMQ 延时插件
基于 过期消息实现延迟队列、过期队列实现延迟队列 存在一些问题,官网提供了基于插件的方式来实现。
消息到达延迟交换机
后,消息不会立即进入队列,先将消息保存至表中,插件将会尝试确认消息是否过期,如果消息过期则投递至目标队列。
Docker环境安装
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
shell#拷贝到rabbitmq容器 b6c96d8d8e6f 中 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez ac88d0cb6bfc:/plugins #进入容器 docker exec -it ac88d0cb6bfc /bin/bash #启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange #查看 rabbitmq-plugins list #重新启动容器 docker restart ac88d0cb6bfc
代码实现
首先创建x-delayed-message
类型的交换机,并绑定队列:
java
@Configuration
public class RabbitMqDelayQueueConfig {
@Bean("delayExchange")
public CustomExchange delayExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
return new CustomExchange("delayExchange", "x-delayed-message", true, false, arguments);
}
/**
* 创建队列:
* 指定死信交换机、路由KEY
*/
@Bean("delayQueue")
public Queue delayQueue() {
return QueueBuilder.durable("delayQueue").build();
}
/**
* 创建绑定关系
*/
@Bean("delayBinding")
public Binding delayBinding(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayExchange") Exchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay.key").noargs();
}
}
创建消费者,监听延迟队列:
java
@RabbitListener(queues = {"delayQueue"})
public void deadQueue(Message message) {
System.out.println("收到消息" + new String(message.getBody()));
System.out.println("当前时间:"+ LocalDateTime.now());
System.out.println("判断订单状态...." + new String(message.getBody()));
System.out.println("未支付,回滚数据库....");
}
创建生产者,发送不同延迟时间的消息:
java
// 发送延迟时间为20秒的消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDelay(20000); // 20秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message001 = new Message("订单:ID:001 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message001);
// 发送延迟时间为10秒的消息
messageProperties.setDelay(10000); // 10秒过期
System.out.println("下订单成功,发送订单消息到MQ中....当前时间:"+ LocalDateTime.now());
Message message002 = new Message("订单:ID:002 状态:未支付".getBytes(), messageProperties);
rabbitTemplate.send("delayExchange", "delay.key", message002);