消息消费
消息消费
超时时间
// Producer 级别配置
DefaultMQProducer producer = new DefaultMQProducer();
producer.setSendMsgTimeout(1000);
// 请求级别配置
// 同步发送
defaultMQProducer.send(message, 100);
// 异步发送
defaultMQProducer.send(message, sendCallback, 100);
需要注意的是,
失败重试
发送失败在不同的场景和需求下的定义都不相同,于是我们需要具体问题具体分析,以同步发送为例:
- 如果我们希望保证消息不丢,那么
Broker.Master 就必须同步刷盘成功; - 如果我们希望消息不丢的同时,如果
Master 故障,消费者也能立马消费到消息,那么Broker.Slave 也必须同步刷盘成功; - 如果我们能容忍掉电级别导致的消息丢失,那么
Broker.Master 只需要写入PageCache 即可。
对高可靠的要求不同,
- 系统失败:客户端异常:
Producer 无法获取broker 的地址;通讯层面的异常:连接不可用、请求超时等;Broker 异常:磁盘满了、创建文件失败、写入PageCache 超时等。可能抛出MQClientException 、RemotingException、MQBrokerException。 - 业务失败:消息
Topic 长度超过上限;消息体大小超过上限;消息的properties 长度超过上限等。可能抛出MQClientException 、MQBrokerException。 - 节点失败:
Broker.Master 刷盘失败,Broker.Slave 不可用或刷盘超时;无异常,根据发送返回值SendResult.sendStatus 来判断。
针对系统失败和业务失败,我们可以通过
如果为了保证消息不丢,只要消息在
Broker 的刷盘策略需要配置为同步刷盘,即FlushDiskType==SYNC_FLUSH
。Producer 在发送消息时,properties 中的“WAIT”属性设置为“true”,表示客户端同步等待刷盘完成。- 客户端需要手动检查发送状态,保证
SendResult.sendStatus=SEND_OK
。
为了性能,我们都是采用同步写