企业环境中使用RocketMQ进行消费的常见问题

0.前言

消息队列主要适用于 解耦、异步、削峰的业务场景。

它可以优化业务的逻辑链路,对高数据量高并发场景进行优化。

它主要分为生产-存储-消费 三个阶段,这篇文章来说一下使用MQ进行消费会遇到的常见问题。

本篇以RocketMQ为场景,它是一个比较成熟的消息队列,并且在不同的业务场景的需求都有一些已经封装好的实现。

1. 消费消息时消息丢失

  • 生产者丢数据:

    只要消息正常发送出去就不会在生产者这里丢。

  • MQ丢数据:

    原因:MQ宕机,网络抖动等。

    解决:

    • 做集群处理(一主多从)

    • 修改消息保存机制为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

    • 集群复制改为同步复制(默认是异步,这个会慢)

1
2
3
4
5
6
7
## master 节点配置
flushDiskType = SYNC_FLUSH
brokerRole=SYNC_MASTER

## slave 节点配置
brokerRole=slave
flushDiskType = SYNC_FLUSH
  • 消费者丢数据:

    原因:消费业务执行过程中丢了

    解决:

    • 优化业务逻辑,参见3. 4.
    • 业务逻辑真正执行成功,我们再返回消费成功状态: ConsumeConcurrentlyStatus.CONSUME_SUCCESS

2. 消息队列的消费策略选择

  • 集群模式(常用)

    一条消息只会被多台消费者中的一台收到并消费(控制台可以查询每条消息被谁消费)

  • 广播模式

    一条消息会被发送给每个消费者(会有重复消费的问题)

3. 对于消息实时性要求较高出现的消费消息积压高延迟问题

  • 原因:

    • 消费时处理太慢,导致收到消息后花了大量时间进行导致阻塞。
    • 消息队列数据量特别大,并发较高。
  • 解决:

    • 优化业务逻辑,以去重场景为例(以消息内容的某几个字段来进行判断去重):

      ​ 入库如果数据量大的话去重可以用联合主键的方式进行去重,如果数据量不大可以进行HashSet去重。

    • 增加消费者的数量(服务器台数)。

    • 设置消费者线程数量,适当设置消费者线程(根据机器配置设定)。

      • PSRocketMQ的消费逻辑有并发消费(ConsumeMessageConcurrentlyService)与顺序消费(ConsumeMessageOrderlyService)两种,并发消费会导致消息读取顺序乱的问题,如果对此有业务要求,则不能进行多线程消费。
    • Redis并设置过期时限,Redis中数据慢慢向数据库中同步,数据先从Redis读取,最终保持数据都能落到数据库中。

4. 消费者重复消费问题

  • 消息本身重复:

    解决:根据消息内容做去重处理

  • 消息重复发送:

    解决:根据消息的MsgId为主键建立一个对照表,接收消息的时候拿MsgId插表,插入失败即为重复。