Skip to content

RocketMQ

RocketMQ基础

RocketMQ组件

  • Broker :消息中转角色,负责 存储消息 ,转发消息。Broker 是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将 Topic 信息注册到NameServer,顺带一提底层的通信和连接都是 基于Netty实现 的。Broker 负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。官网上有数据显示:具有 上亿级消息堆积能力 ,同时可 严格保证消息的有序性
  • Topic :主题!它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0个、1个、多个消费者订阅。
  • Tag :标签!可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同Topic而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。
  • MessageQueue :一个Topic下可以设置多个消息队列,发送消息时执行该消息的Topic,RocketMQ会轮询该Topic下的所有队列将消息发出去。消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。
  • NameServer :类似Kafka中的ZooKeeper,但NameServer集群之间是 没有通信 的,相对ZK来说更加 轻量 。它主要负责对于源数据的管理,包括了对于 Topic 和路由信息的管理。每个Broker在启动的时候会到NameServer注册,Producer在发送消息前会根据Topic去NameServer 获取对应Broker的路由信息 ,Consumer也会定时获取 Topic 的路由信息。
  • Producer :生产者,支持三种方式发送消息:同步、异步和单向 单向发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应,且 没有回调函数 。异步发送 :消息发出去后,可以继续发送下一条消息或执行业务代码,不等待服务器回应, 有回调函数 。同步发送 :消息发出去后,等待服务器响应成功或失败,才能继续后面的操作。
  • Consumer :消费者,支持 PUSH 和 PULL 两种消费模式,支持 集群消费广播消费 集群消费 :该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。广播消费 :会发给消费者组中的每一个消费者进行消费。相当于 RabbitMQ 的发布订阅模式。
  • Group :分组,一个组可以订阅多个Topic。分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的
  • Offset :在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,Offset为Java Long类型,64位,理论上在 100年内不会溢出,所以认为是长度无限。也可以认为Message Queue是一个长度无限的数组, Offset 就是下标。

RocketMQ延时消息

开源版的RocketMQ不支持任意时间精度,仅支持特定的level,例如定时5s,10s,1min等。其中,level=0级表示不延时,level=1表示1级延时,level=2表示2级延时,以此类推。

延时等级如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

RocketMQ顺序消息

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为 分区有序 或者 全局有序 。

RocketMQ事务消息

image-20230425160144804

RocketMQ提供类似X/Open XA的分布式事务功能,通过消息队列MQ事务消息能达到分布式事务的最终一致。上图说明了事务消息的大致流程:正常事务消息的发送和提交、事务消息的补偿流程。

事务消息发送及提交

  1. 发送half消息
  2. 服务端响应消息写入结果
  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行);
  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)。

事务消息的补偿流程

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”;
  2. Producer收到回查消息,检查回查消息对应的本地事务的状态。
  3. 根据本地事务状态,重新Commit或RollBack

其中,补偿阶段用于解决消息Commit或Rollback发生超时或者失败的情况。

事务消息状态

事务消息共有三种状态:提交状态、回滚状态、中间状态:

  1. TransactionStatus.CommitTransaction:提交事务,它允许消费者消费此消息。
  2. TransactionStatus.RollbackTransaction:回滚事务,它代表该消息将被删除,不允许被消费。
  3. TransactionStatus.Unkonwn:中间状态,它代表需要检查消息队列来确定消息状态。

RocketMQ消息机制

Message 发送与接收

Producer 发送消息

image-20220705160256465

image-20220705160345853

DefaultMQProducer#send(Message)

java
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}
@Override
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}

说明:发送同步消息,DefaultMQProducer#send(Message)DefaultMQProducerImpl#send(Message) 进行封装。

DefaultMQProducerImpl#sendDefaultImpl()

java
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendDefaultImpl(
    Message msg, 
    final CommunicationMode communicationMode, 
    final SendCallback sendCallback, 
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 校验 Producer 处于运行状态
    this.makeSureStateOK();
    // 校验消息格式
    Validators.checkMessage(msg, this.defaultMQProducer);
    //
    final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 获取 Topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null; // 最后选择消息要发送到的队列
        Exception exception = null;
        SendResult sendResult = null; // 最后一次发送结果
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
        int times = 0; // 第几次发送
        String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
        // 循环调用发送消息,直到成功
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
            if (tmpmq != null) {
                mq = tmpmq;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    // 调用发送消息核心方法
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                    endTimestamp = System.currentTimeMillis();
                    // 更新Broker可用性信息
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                            // 如下异常continue,进行发送消息重试
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                            // 如果有发送结果,进行返回,否则,抛出异常;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }
                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }
        // 根据不同情况,抛出不同的异常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, 			System.currentTimeMillis() - beginTimestampFirst,
                                    msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
        MQClientException mqClientException = new MQClientException(info, exception);
        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }
        throw mqClientException;
    }
    // Namesrv找不到异常
    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }
    // 消息路由找不到异常
    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

private SendResult sendDefaultImpl(
    Message msg, 
    final CommunicationMode communicationMode, 
    final SendCallback sendCallback, 
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 校验 Producer 处于运行状态
    this.makeSureStateOK();
    // 校验消息格式
    Validators.checkMessage(msg, this.defaultMQProducer);
    //
    final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 获取 Topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null; // 最后选择消息要发送到的队列
        Exception exception = null;
        SendResult sendResult = null; // 最后一次发送结果
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
        int times = 0; // 第几次发送
        String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
        // 循环调用发送消息,直到成功
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
            if (tmpmq != null) {
                mq = tmpmq;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    // 调用发送消息核心方法
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                    endTimestamp = System.currentTimeMillis();
                    // 更新Broker可用性信息
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
                                    continue;
                                }
                            }
                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    continue;
                } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    exception = e;
                    switch (e.getResponseCode()) {
                            // 如下异常continue,进行发送消息重试
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                            // 如果有发送结果,进行返回,否则,抛出异常;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }
                            throw e;
                    }
                } catch (InterruptedException e) {
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                    log.warn(msg.toString());
                    throw e;
                }
            } else {
                break;
            }
        }
        // 返回发送结果
        if (sendResult != null) {
            return sendResult;
        }
        // 根据不同情况,抛出不同的异常
        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, 			System.currentTimeMillis() - beginTimestampFirst,
                                    msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
        MQClientException mqClientException = new MQClientException(info, exception);
        if (exception instanceof MQBrokerException) {
            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
        } else if (exception instanceof RemotingConnectException) {
            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
        } else if (exception instanceof RemotingTimeoutException) {
            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
        } else if (exception instanceof MQClientException) {
            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
        }
        throw mqClientException;
    }
    // Namesrv找不到异常
    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    if (null == nsList || nsList.isEmpty()) {
        throw new MQClientException(
            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    }
    // 消息路由找不到异常
    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}

DefaultMQProducerImpl#tryToFindTopicPublishInfo()

java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
 // 缓存中获取 Topic发布信息
 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
 // 当无可用的 Topic发布信息时,从Namesrv获取一次
 if (null == topicPublishInfo || !topicPublishInfo.ok()) {
	 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
	 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
	 topicPublishInfo = this.topicPublishInfoTable.get(topic);
 }
 // 若获取的 Topic发布信息时候可用,则返回
 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
	 return topicPublishInfo;
 } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
	 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
	 topicPublishInfo = this.topicPublishInfoTable.get(topic);
	 return topicPublishInfo;
 }
}
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
 // 缓存中获取 Topic发布信息
 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
 // 当无可用的 Topic发布信息时,从Namesrv获取一次
 if (null == topicPublishInfo || !topicPublishInfo.ok()) {
	 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
	 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
	 topicPublishInfo = this.topicPublishInfoTable.get(topic);
 }
 // 若获取的 Topic发布信息时候可用,则返回
 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
	 return topicPublishInfo;
 } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
	 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
	 topicPublishInfo = this.topicPublishInfoTable.get(topic);
	 return topicPublishInfo;
 }
}

MQFaultStrategy

image-20220706110703218

java
public class MQFaultStrategy {
    private final static Logger log = ClientLogger.getLog();

    /**
      * 延迟故障容错,维护每个Broker的发送消息的延迟
      * key:brokerName
      */
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    /**
      * 发送消息延迟容错开关
      */
    private boolean sendLatencyFaultEnable = false;
    /**
      * 延迟级别数组
      */
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    /**
      * 不可用时长数组
      */
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    /**
      * 根据 Topic发布信息 选择一个消息队列
      *
      * @param tpInfo Topic发布信息
      * @param lastBrokerName brokerName
      * @return 消息队列
      */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                // 获取 brokerName=lastBrokerName && 可用的一个消息队列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 选择一个消息队列,不考虑队列的可用性
            return tpInfo.selectOneMessageQueue();
        }
        // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    /**
      * 更新延迟容错信息
      *
      * @param brokerName brokerName
      * @param currentLatency 延迟
      * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时
      */
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    /**
      * 计算延迟对应的不可用时间
      *
      * @param currentLatency 延迟
      * @return 不可用时间
      */
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
}
public class MQFaultStrategy {
    private final static Logger log = ClientLogger.getLog();

    /**
      * 延迟故障容错,维护每个Broker的发送消息的延迟
      * key:brokerName
      */
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    /**
      * 发送消息延迟容错开关
      */
    private boolean sendLatencyFaultEnable = false;
    /**
      * 延迟级别数组
      */
    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    /**
      * 不可用时长数组
      */
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    /**
      * 根据 Topic发布信息 选择一个消息队列
      *
      * @param tpInfo Topic发布信息
      * @param lastBrokerName brokerName
      * @return 消息队列
      */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        if (this.sendLatencyFaultEnable) {
            try {
                // 获取 brokerName=lastBrokerName && 可用的一个消息队列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
                // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 选择一个消息队列,不考虑队列的可用性
            return tpInfo.selectOneMessageQueue();
        }
        // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    /**
      * 更新延迟容错信息
      *
      * @param brokerName brokerName
      * @param currentLatency 延迟
      * @param isolation 是否隔离。当开启隔离时,默认延迟为30000。目前主要用于发送消息异常时
      */
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    /**
      * 计算延迟对应的不可用时间
      *
      * @param currentLatency 延迟
      * @return 不可用时间
      */
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
}
  • 更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMaxnotAvailableDuration的配置,对应如下:

    Producer发送消息消耗时长Broker不可用时长
    >= 15000 ms600 * 1000 ms
    >= 3000 ms180 * 1000 ms
    >= 2000 ms120 * 1000 ms
    >= 1000 ms60 * 1000 ms
    >= 550 ms30 * 1000 ms
    >= 100 ms0 ms
    >= 50 ms0 ms

LatencyFaultTolerance

java
public interface LatencyFaultTolerance<T> {

    /**
     * 更新对应的延迟和不可用时长
     *
     * @param name 对象
     * @param currentLatency 延迟
     * @param notAvailableDuration 不可用时长
     */
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);

    /**
     * 对象是否可用
     *
     * @param name 对象
     * @return 是否可用
     */
    boolean isAvailable(final T name);

    /**
     * 移除对象
     *
     * @param name 对象
     */
    void remove(final T name);

    /**
     * 获取一个对象
     *
     * @return 对象
     */
    T pickOneAtLeast();
}
public interface LatencyFaultTolerance<T> {

    /**
     * 更新对应的延迟和不可用时长
     *
     * @param name 对象
     * @param currentLatency 延迟
     * @param notAvailableDuration 不可用时长
     */
    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);

    /**
     * 对象是否可用
     *
     * @param name 对象
     * @return 是否可用
     */
    boolean isAvailable(final T name);

    /**
     * 移除对象
     *
     * @param name 对象
     */
    void remove(final T name);

    /**
     * 获取一个对象
     *
     * @return 对象
     */
    T pickOneAtLeast();
}
  • 说明 :延迟故障容错接口

LatencyFaultToleranceImpl

java
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {

 /**
  * 对象故障信息Table
  */
 private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
 /**
  * 对象选择Index
  * @see #pickOneAtLeast()
  */
 private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

 @Override
 public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
	 FaultItem old = this.faultItemTable.get(name);
	 if (null == old) {
		 // 创建对象
		 final FaultItem faultItem = new FaultItem(name);
		 faultItem.setCurrentLatency(currentLatency);
		 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
		 // 更新对象
		 old = this.faultItemTable.putIfAbsent(name, faultItem);
		 if (old != null) {
			 old.setCurrentLatency(currentLatency);
			 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
		 }
	 } else { // 更新对象
		 old.setCurrentLatency(currentLatency);
		 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
	 }
 }

 @Override
 public boolean isAvailable(final String name) {
	 final FaultItem faultItem = this.faultItemTable.get(name);
	 if (faultItem != null) {
		 return faultItem.isAvailable();
	 }
	 return true;
 }

 @Override
 public void remove(final String name) {
	 this.faultItemTable.remove(name);
 }

 /**
  * 选择一个相对优秀的对象
  *
  * @return 对象
  */
 @Override
 public String pickOneAtLeast() {
	 // 创建数组
	 final Enumeration<FaultItem> elements = this.faultItemTable.elements();
	 List<FaultItem> tmpList = new LinkedList<>();
	 while (elements.hasMoreElements()) {
		 final FaultItem faultItem = elements.nextElement();
		 tmpList.add(faultItem);
	 }
	 //
	 if (!tmpList.isEmpty()) {
		 // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。
		 Collections.shuffle(tmpList);
		 Collections.sort(tmpList);
		 // 选择顺序在前一半的对象
		 final int half = tmpList.size() / 2;
		 if (half <= 0) {
			 return tmpList.get(0).getName();
		 } else {
			 final int i = this.whichItemWorst.getAndIncrement() % half;
			 return tmpList.get(i).getName();
		 }
	 }
	 return null;
 }
}
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {

 /**
  * 对象故障信息Table
  */
 private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<>(16);
 /**
  * 对象选择Index
  * @see #pickOneAtLeast()
  */
 private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();

 @Override
 public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
	 FaultItem old = this.faultItemTable.get(name);
	 if (null == old) {
		 // 创建对象
		 final FaultItem faultItem = new FaultItem(name);
		 faultItem.setCurrentLatency(currentLatency);
		 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
		 // 更新对象
		 old = this.faultItemTable.putIfAbsent(name, faultItem);
		 if (old != null) {
			 old.setCurrentLatency(currentLatency);
			 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
		 }
	 } else { // 更新对象
		 old.setCurrentLatency(currentLatency);
		 old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
	 }
 }

 @Override
 public boolean isAvailable(final String name) {
	 final FaultItem faultItem = this.faultItemTable.get(name);
	 if (faultItem != null) {
		 return faultItem.isAvailable();
	 }
	 return true;
 }

 @Override
 public void remove(final String name) {
	 this.faultItemTable.remove(name);
 }

 /**
  * 选择一个相对优秀的对象
  *
  * @return 对象
  */
 @Override
 public String pickOneAtLeast() {
	 // 创建数组
	 final Enumeration<FaultItem> elements = this.faultItemTable.elements();
	 List<FaultItem> tmpList = new LinkedList<>();
	 while (elements.hasMoreElements()) {
		 final FaultItem faultItem = elements.nextElement();
		 tmpList.add(faultItem);
	 }
	 //
	 if (!tmpList.isEmpty()) {
		 // 打乱 + 排序。TODO 疑问:应该只能二选一。猜测Collections.shuffle(tmpList)去掉。
		 Collections.shuffle(tmpList);
		 Collections.sort(tmpList);
		 // 选择顺序在前一半的对象
		 final int half = tmpList.size() / 2;
		 if (half <= 0) {
			 return tmpList.get(0).getName();
		 } else {
			 final int i = this.whichItemWorst.getAndIncrement() % half;
			 return tmpList.get(i).getName();
		 }
	 }
	 return null;
 }
}
  • 说明 :延迟故障容错实现。维护每个对象的信息。

FaultItem

java
class FaultItem implements Comparable<FaultItem> {
    /**
      * 对象名
      */
    private final String name;
    /**
      * 延迟
      */
    private volatile long currentLatency;
    /**
      * 开始可用时间
      */
    private volatile long startTimestamp;

    public FaultItem(final String name) {
        this.name = name;
    }

    /**
      * 比较对象
      * 可用性 > 延迟 > 开始可用时间
      *
      * @param other other
      * @return 升序
      */
    @Override
    public int compareTo(final FaultItem other) {
        if (this.isAvailable() != other.isAvailable()) {
            if (this.isAvailable())
                return -1;

            if (other.isAvailable())
                return 1;
        }

        if (this.currentLatency < other.currentLatency)
            return -1;
        else if (this.currentLatency > other.currentLatency) {
            return 1;
        }

        if (this.startTimestamp < other.startTimestamp)
            return -1;
        else if (this.startTimestamp > other.startTimestamp) {
            return 1;
        }

        return 0;
    }

    /**
      * 是否可用:当开始可用时间大于当前时间
      *
      * @return 是否可用
      */
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }

    @Override
    public int hashCode() {
        int result = getName() != null ? getName().hashCode() : 0;
        result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
        result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
        return result;
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o)
            return true;
        if (!(o instanceof FaultItem))
            return false;

        final FaultItem faultItem = (FaultItem) o;

        if (getCurrentLatency() != faultItem.getCurrentLatency())
            return false;
        if (getStartTimestamp() != faultItem.getStartTimestamp())
            return false;
        return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;

    }
}
class FaultItem implements Comparable<FaultItem> {
    /**
      * 对象名
      */
    private final String name;
    /**
      * 延迟
      */
    private volatile long currentLatency;
    /**
      * 开始可用时间
      */
    private volatile long startTimestamp;

    public FaultItem(final String name) {
        this.name = name;
    }

    /**
      * 比较对象
      * 可用性 > 延迟 > 开始可用时间
      *
      * @param other other
      * @return 升序
      */
    @Override
    public int compareTo(final FaultItem other) {
        if (this.isAvailable() != other.isAvailable()) {
            if (this.isAvailable())
                return -1;

            if (other.isAvailable())
                return 1;
        }

        if (this.currentLatency < other.currentLatency)
            return -1;
        else if (this.currentLatency > other.currentLatency) {
            return 1;
        }

        if (this.startTimestamp < other.startTimestamp)
            return -1;
        else if (this.startTimestamp > other.startTimestamp) {
            return 1;
        }

        return 0;
    }

    /**
      * 是否可用:当开始可用时间大于当前时间
      *
      * @return 是否可用
      */
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }

    @Override
    public int hashCode() {
        int result = getName() != null ? getName().hashCode() : 0;
        result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));
        result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));
        return result;
    }

    @Override
    public boolean equals(final Object o) {
        if (this == o)
            return true;
        if (!(o instanceof FaultItem))
            return false;

        final FaultItem faultItem = (FaultItem) o;

        if (getCurrentLatency() != faultItem.getCurrentLatency())
            return false;
        if (getStartTimestamp() != faultItem.getStartTimestamp())
            return false;
        return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;

    }
}
  • 说明 :对象故障信息。维护对象的名字、延迟、开始可用的时间。

DefaultMQProducerImpl#sendKernelImpl()

java
private SendResult sendKernelImpl(final Message msg, //
                                  final MessageQueue mq, //
                                  final CommunicationMode communicationMode, //
                                  final SendCallback sendCallback, //
                                  final TopicPublishInfo topicPublishInfo, //
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 获取 broker地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    //
    SendMessageContext context = null;
    if (brokerAddr != null) {
        // 是否使用broker vip通道。broker会开启两个端口对外服务。
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
        byte[] prevBody = msg.getBody(); // 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
        try {
            // 设置唯一编号
            MessageClientIDSetter.setUniqID(msg);
            // 消息压缩
            int sysFlag = 0;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
            }
            // 事务
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }
            // hook:发送消息校验
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
            // hook:发送消息前逻辑
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }
                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
            // 构建发送消息请求
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重发Topic
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
            // 发送消息
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                        brokerAddr, // 1
                        mq.getBrokerName(), // 2
                        msg, // 3
                        requestHeader, // 4
                        timeout, // 5
                        communicationMode, // 6
                        sendCallback, // 7
                        topicPublishInfo, // 8
                        this.mQClientFactory, // 9
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                        context, //
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }
            // hook:发送消息后逻辑
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
            // 返回发送结果
            return sendResult;
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
        }
    }
    // broker为空抛出异常
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
private SendResult sendKernelImpl(final Message msg, //
                                  final MessageQueue mq, //
                                  final CommunicationMode communicationMode, //
                                  final SendCallback sendCallback, //
                                  final TopicPublishInfo topicPublishInfo, //
                                  final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 获取 broker地址
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
    //
    SendMessageContext context = null;
    if (brokerAddr != null) {
        // 是否使用broker vip通道。broker会开启两个端口对外服务。
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
        byte[] prevBody = msg.getBody(); // 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
        try {
            // 设置唯一编号
            MessageClientIDSetter.setUniqID(msg);
            // 消息压缩
            int sysFlag = 0;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
            }
            // 事务
            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }
            // hook:发送消息校验
            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }
            // hook:发送消息前逻辑
            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }
                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }
            // 构建发送消息请求
            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重发Topic
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }
                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }
            // 发送消息
            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                        brokerAddr, // 1
                        mq.getBrokerName(), // 2
                        msg, // 3
                        requestHeader, // 4
                        timeout, // 5
                        communicationMode, // 6
                        sendCallback, // 7
                        topicPublishInfo, // 8
                        this.mQClientFactory, // 9
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
                        context, //
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }
            // hook:发送消息后逻辑
            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }
            // 返回发送结果
            return sendResult;
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
        }
    }
    // broker为空抛出异常
    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
Broker 发送消息

image-20220706111637175

SendMessageProcessor#sendMessage

AbstractSendMessageProcessor#msgCheck

DefaultMessageStore#putMessage

Message 存储

CommitLog

Last updated: