消息消费

消息消费

超时时间

// Producer 级别配置
DefaultMQProducer producer = new DefaultMQProducer();
producer.setSendMsgTimeout(1000);

// 请求级别配置
// 同步发送
defaultMQProducer.send(message, 100);
// 异步发送
defaultMQProducer.send(message, sendCallback, 100);

需要注意的是,asynconeway两种发送方式的超时时间和同步发送有比较大的区别,这两种方式下,每个消息发送者都有一个资源信号量来控制发送的并发度,获取资源锁可能存在等待耗时。

失败重试

发送失败在不同的场景和需求下的定义都不相同,于是我们需要具体问题具体分析,以同步发送为例:

  • 如果我们希望保证消息不丢,那么Broker.Master就必须同步刷盘成功;
  • 如果我们希望消息不丢的同时,如果Master故障,消费者也能立马消费到消息,那么Broker.Slave也必须同步刷盘成功;
  • 如果我们能容忍掉电级别导致的消息丢失,那么Broker.Master只需要写入PageCache即可。

对高可靠的要求不同,Broker的刷盘策略及HA策略也各不相同,Producer的处理逻辑自然也就不同,失败又可以分为以下几类:

  • 系统失败:客户端异常:Producer无法获取broker的地址;通讯层面的异常:连接不可用、请求超时等;Broker异常:磁盘满了、创建文件失败、写入PageCache超时等。可能抛出MQClientException、RemotingException、MQBrokerException。
  • 业务失败:消息Topic长度超过上限;消息体大小超过上限;消息的properties长度超过上限等。可能抛出MQClientException、MQBrokerException。
  • 节点失败:Broker.Master刷盘失败,Broker.Slave不可用或刷盘超时;无异常,根据发送返回值SendResult.sendStatus来判断。

针对系统失败和业务失败,我们可以通过DefaultMQProducer.retryTimesWhenSendFailed来配置重试次数,对于高可用失败,可以通过DefaultMQProducer.retryAnotherBrokerWhenNotStoreOK来配置切换broker的重试。

如果为了保证消息不丢,只要消息在Master同步落盘即可:

  • Broker的刷盘策略需要配置为同步刷盘,即 FlushDiskType==SYNC_FLUSH
  • Producer在发送消息时,properties中的“WAIT”属性设置为“true”,表示客户端同步等待刷盘完成。
  • 客户端需要手动检查发送状态,保证 SendResult.sendStatus=SEND_OK

为了性能,我们都是采用同步写PageCache与异步刷盘的策略,甚至是同步写预分配内存与异步写、PageCache与异步刷盘。