消息消费

消息消费

超时时间

// Producer 级别配置
DefaultMQProducer producer = new DefaultMQProducer();
producer.setSendMsgTimeout(1000);

// 请求级别配置
// 同步发送
defaultMQProducer.send(message, 100);
// 异步发送
defaultMQProducer.send(message, sendCallback, 100);

需要注意的是,async 和 oneway 两种发送方式的超时时间和同步发送有比较大的区别,这两种方式下,每个消息发送者都有一个资源信号量来控制发送的并发度,获取资源锁可能存在等待耗时。

失败重试

发送失败在不同的场景和需求下的定义都不相同,于是我们需要具体问题具体分析,以同步发送为例:

  • 如果我们希望保证消息不丢,那么 Broker.Master 就必须同步刷盘成功;
  • 如果我们希望消息不丢的同时,如果 Master 故障,消费者也能立马消费到消息,那么 Broker.Slave 也必须同步刷盘成功;
  • 如果我们能容忍掉电级别导致的消息丢失,那么 Broker.Master 只需要写入 PageCache 即可。

对高可靠的要求不同,Broker 的刷盘策略及 HA 策略也各不相同,Producer 的处理逻辑自然也就不同,失败又可以分为以下几类:

  • 系统失败:客户端异常:Producer 无法获取 broker 的地址;通讯层面的异常:连接不可用、请求超时等;Broker 异常:磁盘满了、创建文件失败、写入 PageCache 超时等。可能抛出 MQClientException、RemotingException、MQBrokerException。
  • 业务失败:消息 Topic 长度超过上限;消息体大小超过上限;消息的 properties 长度超过上限等。可能抛出 MQClientException、MQBrokerException。
  • 节点失败:Broker.Master 刷盘失败,Broker.Slave 不可用或刷盘超时;无异常,根据发送返回值 SendResult.sendStatus 来判断。

针对系统失败和业务失败,我们可以通过 DefaultMQProducer.retryTimesWhenSendFailed 来配置重试次数,对于高可用失败,可以通过 DefaultMQProducer.retryAnotherBrokerWhenNotStoreOK 来配置切换 broker 的重试。

如果为了保证消息不丢,只要消息在 Master 同步落盘即可:

  • Broker 的刷盘策略需要配置为同步刷盘,即 FlushDiskType==SYNC_FLUSH
  • Producer 在发送消息时,properties 中的“WAIT”属性设置为“true”,表示客户端同步等待刷盘完成。
  • 客户端需要手动检查发送状态,保证 SendResult.sendStatus=SEND_OK

为了性能,我们都是采用同步写 PageCache 与异步刷盘的策略,甚至是同步写预分配内存与异步写、PageCache 与异步刷盘。