RabbitMQ学习笔记
RabbitMQ Tutorials
RabbitMQ组件
Broker
:一个RabbitMQ实例就是一个BrokerVirtual Host
:虚拟主机。相当于MySQL的DataBase ,一个Broker上可以存在多个vhost,vhost之间相互隔离。每个vhost都拥有自己的队列、交换机、绑定和权限机制。vhost必须在连接时指定,默认的vhost是/。Exchange
:交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Queue
:消息队列,用来保存消息直到发送给消费者。它是消息的容器。一个消息可投入一个或多个队列。Banding
:绑定关系,用于 消息队列和交换机之间的关联 。通过路由键( Routing Key )将交换机和消息队列关联起来。Channel
:管道,一条双向数据流通道。不管是发布消息、订阅队列还是接收消息,这些动作都是通过管道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了管道的概念,以复用一条TCP连接。Connection
:生产者/消费者 与broker之间的TCP连接。Publisher
:消息的生产者。Consumer
:消息的消费者。Message
:消息,它是由消息头和消息体组成。消息头则包括 Routing-Key 、 Priority (优先级)等
RabbitMQ通信模型
simple
生产者将消息发送到队列中,一个消费者一定能从队列中获取消息
Work Queue
多个消费者对应同一个队列,提高数据的消费能力。多个消费者绑定到同一个队列上,共同消费队列中的数据,队列中数据一旦消费了就自动删除,不会重复执行。 若多个消费者同时监听一个队列,默认情况下MQ平均分配消息
Publish/Subscribe
生产者发送消息给交换机(exchange)了,由交换机确定发送给具体队列。这个交换机用来接收生产者发送的数据,并将将其转发给对应的队列,根据交换机发送数据的不同方式,分成3种:
1.Fanout:广播模式,将消息发送给所有的队列
2.Direct:定向,把消息发送给 routingkey 相同的队列
队列与交换机不能任意绑定,需要指定一个Routingkey(路由key) 在消息的发送发向交换机发送消息是,也需要指定RoutingKey 交换机就会把消息发送给Routingkey一样的队列
3.Topic:通配符,把消息发送给符合 routing pattern(路由模式)的队列
routingkey可以采用通配符进行定义,一般都是有一个或多个单词组成,多个单词之间使用“.”分割
- 通配符规则 .# : 匹配一个或多个单词 .* : 匹配一个单词
- routing key 为一个句点号
"."
分隔的字符串。我们将被句点号"."
分隔开的每一段独立的字符串称为一个单词,例如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit” - binding key 与 routing key 一样也是句点号
"."
分隔的字符串。 - binding key 中可以存在两种特殊字符
"*"
与"#"
,用于做模糊匹配。其中"*"
用于匹配一个单词,"#"
用于匹配多个单词(可以是零个)。
RabbitMQ TTL
TTL(Time To Live):生存时间。RabbitMQ支持消息的过期时间,一共2种。
RabbitMQ
可以对消息设置过期时间,也可以对整个队列(Queue)
设置过期时间。
- 在消息发送时进行指定 。通过配置消息体的 Properties ,可以指定当前消息的过期时间。
- 在创建Exchange时指定 。从进入消息队列开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。
RabbitMQ DLX
死信队列(DLX Dead-Letter-Exchange):当消息在一个队列中变成死信之后,它会被重新推送到另一个队列,这个队列就是死信队列。DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
死信队列(DLX)产生来源
①:消息被拒绝(basic.reject/basic.nack),且 requeue = false(代表不重新回到队列)
②:消息因TTL过期(就是任务消息上携带过期时间)
③:消息队列的消息数量已经超过最大队列长度,先入队的消息会被丢弃变为死信
消息TTL过期产生死信
死信交换机、死信队列
需要我们自己创建,只是业务中用来存放死信的“特殊”
交换机队列。其他队列可以指定死信交换机、死信队列
,当发生死信时,自动将其投递到死信队列
中。
RabbitMQ 消息持久化
交换机
交换机的持久化其实就是相当于将交换机的属性在服务器内部保存,当MQ的服务器发生意外或关闭之后,重启RabbitMQ时不需要重新手动或执行代码去建立交换机,交换机会自动建立,相当于一直存在。
创建时,使用durable(true)设置持久化:
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.directExchange("bootExchange").durable(true).build();
}
队列
队列持久化类似于交换机持久化,创建时,使用durable(" ")设置持久化:
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable("bootQueue").build();
}
消息
众所周知,RabbitMQ的消息是依附于队列存在的,所以想要消息持久化,那么前提是队列也要持久化。 RabbitTemplate
发送消息默认就是持久化的
RabbitTemplate.convertAndSend()
RabbitMQ对象序列化
使用RabbitMQ
原生API
,发送消息时,发送的是二进制byte[]
数据。
使用RabbitTemplate.send
方法发送Message
对象,也是二进制byte[]
数据。
在接收时,需要将二进制数据转为你想要的数据格式。在JAVA
编程中都是基于对象操作,一般消息都是对象,比如订单、日志。
所以RabbitTemplate
提供了convertAndSend
方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。
使用Jackson 序列化
只需要在RabbitTemplate 、监听容器工厂RabbitListenerContainerFactory中设置转换器即可。
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
RabbitMQ Confirm机制
- 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
- 生产者进行接受应答,用来确认这条消息是否正常的发送到了Broker,这种方式也是 消息的可靠性投递的核心保障!
如何实现Confirm确认消息?
- 在channel上开启确认模式 :
channel.confirmSelect()
- 在channel上开启监听 :addConfirmListener ,监听成功和失败的处理结果,根据具体的结果对消息进行重新发送或记录日志处理等后续操作。
开启发布确认模式配置项
publisher-confirm-type
,该配置项是枚举,可以配置为NONE
,SIMPLE
,CORRELATED
。[NONE
表示关闭确认回调,这是默认配置]yamlspring: rabbitmq: username: xxx password: xxx host: localhost port: 5672 # 开启 publisher-confirm-type: CORRELATED # springboot2.2.0,该属性已过时,通过publisher-confirm-type配置 # publisher-confirms: true
给
RabbitTemplate
设置一个confirmCallback
回调函数javarabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("进入回调方法"); System.out.println("消息ID:" + correlationData.getId()); // 如果开启了退回模式,还可以通过correlationData获取退回消息 if (correlationData.getReturned() != null) { System.out.println("消息被退回,原因:" + correlationData.getReturned().getReplyText()); } if (b) { System.out.println("确认接收到消息"); } else { System.out.println("失败,原因:" + s); } } }); // 可以创建CorrelationData对象,设置id属性,用来表示当前消息的唯一性。 // 确认时,可以获取ID,知道是哪个消息 CorrelationData correlationData = new CorrelationData("34424343"); rabbitTemplate.convertAndSend("bootExchange", "boot222.key", "HELLO SPRING BOOT", correlationData);
RabbitMQ Return机制
默认情况下,如果消息没有匹配到队列会直接丢弃
,采用退回模式
可以在生产者端监听改消息是否被成功投递到队列中。
我们的消息生产者,通过指定一个Exchange和Routing,把消息送达到某一个队列中去,然后我们的消费者监听队列进行消息的消费处理操作。
但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候我们需要监听这种不可达消息,就需要使用到Returrn Listener
。
在配置文件中开启退回模式:
yamlspring: rabbitmq: username: xxx password: xxx host: localhost port: 5672 # 开启回退模式 publisher-returns: true
基础API中有个关键的配置项 Mandatory :如果为true,监听器会收到路由不可达的消息,然后进行处理。如果为false,broker端会自动删除该消息。
同样,通过监听的方式, chennel.addReturnListener(ReturnListener rl)
传入已经重写过handleReturn方法的ReturnListener。
给
RabbitTemplate
设置一个returnsCallback
回调函数,如果消息路由键没有匹配的队列,投递出去后将被退回,执行回调函数。java// 回退模式回调函数 rabbitTemplate.setReturnsCallback(returnedMessage -> { System.out.println("消息被回退,原因:"+returnedMessage.getReplyText()); System.out.println(returnedMessage.getMessage()); // 回退消息 System.out.println( returnedMessage.getExchange()); // 交换机 System.out.println(returnedMessage.getReplyCode()); // 返回原因的代码 System.out.println(returnedMessage.getReplyText()); // 返回信息,例如NO_ROUTE System.out.println(returnedMessage.getRoutingKey()); // 路由KEY }); // 路由键aaa.bbb.key,没有匹配的队列 rabbitTemplate.convertAndSend("bootExchange", "aaa.bbb.key", "HELLO SPRING BOOT");
RabbitMQ ACK与NACK
消费端进行消费的时候,如果由于业务异常可以进行日志的记录,然后进行补偿。但是对于服务器宕机等严重问题,我们需要 手动ACK 保障消费端消费成功。
// deliveryTag:消息在mq中的唯一标识
// multiple:是否批量(和qos设置类似的参数)
// requeue:是否需要重回队列。或者丢弃或者重回队首再次消费。
public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
消息在 消费端重回队列 是为了对没有成功处理消息,把消息重新返回到Broker。一般来说,实际应用中都会关闭重回队列( 避免进入死循环 ),也就是设置为false。
@Component
public class RabbitConsumer {
@RabbitListener(queues = {"bootQueue"})
public void receiveMessage(Message message, Channel channel) throws IOException {
// 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("收到消息===" + new String(message.getBody()));
System.out.println("处理业务逻辑");
// 发生异常
// int i = 5 / 0;
// 处理完成后,确认
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 发生异常,拒绝签收
e.printStackTrace();
channel.basicNack(deliveryTag, true, true);
}
}
}
RabbitMQ高可用
消息顺序消费
如何保证 RabbitMQ 消息的顺序消费?
思路就是拆分queue
,使得一个queue只对应一个消费者,这样消费者一定是按照顺序消费的;
如果消息数量较大,那我们可以按照消息类型拆分队列,你管你消息再多,不可能所有消息都是需要顺序性消费的吧,我们可以灵活一点,视情况而定,比如某三个消息ABC的对应的操作是需要顺序消费的,那就把这三个放到同一个队列;如果有多组这样的ABC消息需要保证顺序,那我们就多搞几个队列;不需要保证顺序的消息就放在其它队列;
消息重复消费
如何保证 RabbitMQ 消息不会重复消费?
这个要分情况:
如果是direct模式:
一个队列对应一个消费者,那不存在重复消费的问题;如果是一个队列对应多个消费者,那消费者会通过轮询来消费,也不会存在重复消费的问题;
如果是topic或者广播模式:
一个队列对应了多个消费者,且消费者会同时收到消息,那就会出现重复消费的问题,如果我们不希望出现重复消费,我们可以给消息加一个唯一id,存到redis里面,消息消费成功后就存到redis里面去,这里我们可以用redis的set类型,然后每次消费之前先看看redis里面有没有该id;