企业环境中使用RocketMQ进行消费的常见问题
0.前言
消息队列主要适用于 解耦、异步、削峰的业务场景。
它可以优化业务的逻辑链路,对高数据量高并发场景进行优化。
它主要分为生产-存储-消费 三个阶段,这篇文章来说一下使用MQ
进行消费会遇到的常见问题。
本篇以RocketMQ
为场景,它是一个比较成熟的消息队列,并且在不同的业务场景的需求都有一些已经封装好的实现。
1. 消费消息时消息丢失
生产者丢数据:
只要消息正常发送出去就不会在生产者这里丢。
MQ
丢数据:原因:
MQ
宕机,网络抖动等。解决:
做集群处理(一主多从)
修改消息保存机制为同步刷盘方式,即消息存储磁盘成功,才会返回响应。
集群复制改为同步复制(默认是异步,这个会慢)
1 | ## master 节点配置 |
消费者丢数据:
原因:消费业务执行过程中丢了
解决:
- 优化业务逻辑,参见
3. 4.
- 业务逻辑真正执行成功,我们再返回消费成功状态:
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
。
- 优化业务逻辑,参见
2. 消息队列的消费策略选择
集群模式(常用)
一条消息只会被多台消费者中的一台收到并消费(控制台可以查询每条消息被谁消费)
广播模式
一条消息会被发送给每个消费者(会有重复消费的问题)
3. 对于消息实时性要求较高出现的消费消息积压高延迟问题
原因:
- 消费时处理太慢,导致收到消息后花了大量时间进行导致阻塞。
- 消息队列数据量特别大,并发较高。
解决:
优化业务逻辑,以去重场景为例(以消息内容的某几个字段来进行判断去重):
入库如果数据量大的话去重可以用联合主键的方式进行去重,如果数据量不大可以进行
HashSet
去重。增加消费者的数量(服务器台数)。
设置消费者线程数量,适当设置消费者线程(根据机器配置设定)。
PS
:RocketMQ
的消费逻辑有并发消费(ConsumeMessageConcurrentlyService
)与顺序消费(ConsumeMessageOrderlyService
)两种,并发消费会导致消息读取顺序乱的问题,如果对此有业务要求,则不能进行多线程消费。
存
Redis
并设置过期时限,Redis
中数据慢慢向数据库中同步,数据先从Redis
读取,最终保持数据都能落到数据库中。
4. 消费者重复消费问题
消息本身重复:
解决:根据消息内容做去重处理
消息重复发送:
解决:根据消息的
MsgId
为主键建立一个对照表,接收消息的时候拿MsgId
插表,插入失败即为重复。