RocketMQ 消费失败消息处理

后端存储 简书

闲着研究了下RocketMQ消费失败消息的处理逻辑这里记录下~,更细化说这里只讨论Push模式(其实实现还是Pull的模式)非顺序消费的情况~Pull和顺序消息这里暂时不做讨论哈~(还没研究- -)

消费失败处理逻辑

  • 消费成功的情况RockeMQ会通过移动消费offset位点向前来标示消息已被处理
  • 而对于业务处里失败的消息采用的策略是将消息回发回Broker(并存放到一个 %RETRY%XX 的topic中), 大家一听可以想到的是回发broker这时候broker挂了怎么办?
  • 回发失败的时候会在本机启动个task来重试..恩 然后这时候consumer机器挂了了怎么办重试没了,难道丢消息?
  • 所以为了保证consumer掉电不丢消费失败且回发失败的消息,代码里保证offsetManage(local or remote)中offset不会前移超过重发失败消息的offset,这样可以保证在下次需要,如果下次consumer活过来时(这时一定会从offsetManage中取offset, 恩其实正常运行中不会每次都取, 顺序消息除外…),一定可以重拉到消费失败的消息(后面会提到这个的代价是会重复拉到很多上次已经消费的消息,不过业务同学的代码都是幂等的,所以逃~)
  • 对于消费失败但回发成功的消息,会直接更新offset假装认为那几条消息已经被消费成功,因为他们已经转生在 %RETRY%XX topic里作为新消息等待消费了~当前消费者可以专注于干其他事情.(补充: RERTRY topic实际会带上delay所以实际是先 SCHEDULE_TOPIC 然后再 %RETRY%XX , 这个具体见其他同学关于delay消息的解析)

可以看到,重发这种模式是不会丢消息的,即使broker挂了,consumer挂了,一定会消费到,虽然可能获得很多不想要的重复消息- –

为啥这么搞

写本文的原因就是我组几个小伙伴都觉得这个很奇怪,为啥这么弄呢~?个人研究了下理解是这样的….(其实自己刚开始研究RocketMQ很多理解可能有问题,欢迎大家一块讨论学习 哈哈哈)

可以冷静看下当前消息消费场景特点:

  • 给了我们一个Queue,访问的时候需要通过一层网络
  • 为了希望消费者能尽快的获得大量消息,结合上条希望consumer更好是一次获取一批而不是一条消息
  • 因为顺序并不重要,consumer本身应该可以并发消费这批消息
  • 因为并发消费消息,不是等上条ok才能消费下一条,就会有ack先后顺序问题
  • 为了server端ack应该是高效的,每条记录一个状态 vs 已成功offset?
  • 更进一步,获取获取第二批消息能否需要等待上一批消费完成? 其实没必要只要消费者有空闲线程可以先抓过来消费第二批,虽然第一批里某几条处理比较慢,但多数情况下应该能不会一会就恢复,其他线程先干第二批的活, 所以拉取和处理应该分离
  • 其实从上条可以看出对于抓取的速度应该根据消费者处理能力来控制~如果消费还有闲的可以疯狂的从Queue中先抓过来,只要不把没处理成功的给ack掉;如果消费者已经严重delay无力处理则需要降低抓取速度

完整处理

感觉RocketMQ处理这部分的代码挺巧妙…几个核心参与类:

  • ConsumeQueue : Consumer角度消费的一个Queue(有些类似kafka里partition的概念, 一个Queue只会被一个consumer消费,虽然一个consumer可以消费多个Q)
  • PullRequest : 当前consumer下已分配的每个ConsumeQueue消费者端都会新建一个PullRequest,里面记录 nextOffset 即从Server 拉取offset拉取offset消费offset 是两个offset才能第一批没消费完就拉第二批; 这个Request会在rebalanceService中创建,并被多次更新nextOffset多次进入PullRequestQueue来达到持续拉取的循环效果- -(也会被延迟丢Q来控制速率)
  • PullRequestQueue : 一个内存队列,充当 PullMessageService 的入参
  • PullMessageService : 负责拉取消息的 拉取线程 ,不停的读取 PullRequestQueue 根据request拉取消息,然后将消息丢到 ProcessQueue 中并新建 ConsumeRequest 提交到 ConsumeService 处理, 然后生成下一批的PullRequest丢到 PullRequestQueue :继续消费下一批,达到持续循环拉取的作用
  • ConsumeRequest : 虽然叫request但除了要consume的消息数据外,还有具体的消费逻辑(是个Runnable- -); 关键元素就是这批要处理msg列表对这批消息的处理逻辑,run里会调用用户注册的listener,并根据处理情况,对失败消息回发,并根据失败和回发结果, 更新ProcessQueue以及OffsetStore
  • ProcessQueue : 又一个内存队列保存实现是TreeMap,在处理中的消息,处理处理成功或处理失败但回发成功都会从这个Queue中移除, 消费offset 的上报基于ProcessQueue中最小的offset来完成(所以失败未回发成功的不会被移除);另外在ProcessQueue里最大offset和最小offset过大(MaxSpan)时,前面的PullMessageService会减速等一会在基于运行抓取(等一会儿再往PullRequestQueue里扔消息).
  • ConsumeService : 一个ConsumeRequest的Executor可以理解为一个线程池
  • OffsetStore : 维护 消费offset (即offset之前都处理完成)
  • RebalanceService : 负责给consume分配queue,而对于目前讨论过程他的作用是初始化了了对应的PullRequestQueue和首次的PullRequest, offset从offsetStore获取

简单画了个图说明上面几个类的关系~(手指在ipad上画得没有笔所以特别难看- -先将就吧)

Notes – Page 1.png

失败重试的细节好像没画出来,= = 画图不好画。。结合上面描述看代码哈~- –

最终达到的效果

  • 从Server是批量拉取的
  • 拉取线程不需要等待上一批被处理就能开始拉取下一批,只要ProcessQueue没超 MaxSpan (也就是消费某几条卡主太久), 就可以一直拉取
  • 消费listener可以并发消费,并各自返回完成状态, 部分消费者卡一段时间不影响其他消费者消费
  • Consumer保证实际消费端offset保证offset之前必须是已处理成功或处理失败但已回发成功
  • 回发不成功会本地重试且远端offset不会前移
  • 如果重启或被新分配队列会从offsetStrore获取初始offset,所以可能会有不必要的重复消息,所以消息处理需要做好幂等

总结

  • 可以批量,且拉取和处理分离,同时保证不丢数据,提升效率同时代价是重复消息
  • 通过只记录offset提升ack效率(可以一次ack一批,并且不用每条记录记录状态)
  • 通过分离拉取offset(in PullRequest)和消费offset(in OffsetStore)分离拉取和处理进度,提升拉取效率,并根据消费者处理卡主情况做拉取阀值控制
  • 通过回发消息加速批次中其他已处理成功消息的ack消费offset,如果不回发那消费offset不能迁移,重启之类会导致更多的消息重发, 而有回发后只要回发成功了就可以前移offset(反正不关心顺序只要求效率)
  • 对于回发失败,只能不前移消费offset。。。然后通过本地重试做非consumer宕机情况的优化
简书稿源:简书 (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » RocketMQ 消费失败消息处理

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录