rocketmq第一次拉取消息获取位点失败导致位点回退的问题

昨天业务同学反馈说某个 topic 的某个 Queue 发生了消息堆积,看了下其他 Queue 都是正常的,就这一个 Queue 发生了堆积,而且位点是三天前。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
client日志

2021-03-02 15:15:12 WARN PullMessageService - invokeSync: wait response timeout exception, the channel[brokerip:10911]
2021-03-02 15:15:12 WARN NettyClientWorkerThread_2 - receive response, but not matched any request, brokerip:10911
2021-03-02 15:15:12 WARN NettyClientWorkerThread_2 - RemotingCommand [code=0, language=JAVA, version=317, opaque=6774, flag(B)=1, remark=null, extFields={offset=5757922}, serializeTypeCurrentRPC=JSON]
2021-03-02 15:15:12 WARN PullMessageService - fetchConsumeOffsetFromBroker exception, MessageQueue [topic=tp_inner_order_operation, brokerName=broker-06, queueId=10]
2021-03-02 15:15:12 WARN NettyClientPublicExecutor_3 - the pull request offset illegal, PullRequest [consumerGroup=ts_biz_operation_group, messageQueue=MessageQueue [topic=tp_inner_order_operation, brokerName=broker-0
6, queueId=10], nextOffset=-1] PullResult [pullStatus=OFFSET_ILLEGAL, nextBeginOffset=5531762, minOffset=5531762, maxOffset=5757977, msgFoundList=0]
2021-03-02 15:15:22 WARN PullMessageServiceScheduledThread - unlock messageQueue. group:ts_biz_operation_group, clientId:clientip@30487, mq:MessageQueue [topic=tp_inner_order_operation, brokerName=broker-06, queue
Id=10]
2021-03-02 15:15:22 WARN PullMessageServiceScheduledThread - fix the pull request offset, PullRequest [consumerGroup=ts_biz_operation_group, messageQueue=MessageQueue [topic=tp_inner_order_operation, brokerName=broker
-06, queueId=10], nextOffset=5531762]
2021-03-02 15:15:45 WARN PullMessageService - the consumer message buffer is full, so do flow control, minOffset=5531803, maxOffset=5532809, size=1008, pullRequest=PullRequest [consumerGroup=ts_biz_operation_group, me
ssageQueue=MessageQueue [topic=tp_inner_order_operation, brokerName=broker-06, queueId=10], nextOffset=5532810], flowControlTimes=1



broker日志

broker.log:2021-03-02 15:15:12 INFO PullMessageThread_5 - the request offset too small. group=ts_biz_operation_group, topic=tp_inner_order_operation, requestOffset=-1, brokerMinOffset=5531762, clientIp=/clientip:24944
broker.log:2021-03-02 15:15:12 WARN PullMessageThread_5 - PULL_OFFSET_MOVED:correction offset. topic=tp_inner_order_operation, groupId=ts_biz_operation_group, requestOffset=-1, newOffset=5531762, suggestBrokerId=0
broker.log:2021-03-02 15:15:22 WARN ConsumerManageThread_15 - [NOTIFYME]update consumer offset less than store. clientHost=clientip:24944, key=tp_inner_order_operation@ts_biz_operation_group, queueId=10, requestOffset=5531762, storeOffset=5757922
store.log:2021-03-02 04:54:08 INFO StoreScheduledThread1 - Compute logical min offset: 5531762, topic: tp_inner_order_operation, queueId: 10

image-20210303181900213

image-20210303181922023

image-20210303181947327

结合上面的日志和 rocketmq 的代码。基本可以推断出事情是这样发生的。

  1. 160 这台服务器进行发布,客户端初始化,向 broker 拉取位点(请求1)。

  2. 这次请求超时,客户端把 offset = -1 当做 PullRequest 参数发从给 broker(请求2)。

  3. 请求1 已经返回了,但是因为 rmq client 的 fastfail 机制对应的 request 已经不存在了。

  4. broker 收到请求2,内部逻辑是当请求的 offset = -1 的时候,返回该 topic 当前还存在的文件的最小 offset 55w。

  5. client 收到最小 offset 再拉取消息,把 broker 的保存的之前的位点 57w 覆盖了。

大概的逻辑就是这样,就是感觉这部分其实有点优化空间,不太理解 RMQ 为什么不拿之前保存在 map 里的位点返回,而是直接返回最小。


更新:提了个 issue 基本确认了这应该是个BUG,client 没有区分其他异常和 Timeout 异常。解决的办法可以在 client 区分下异常分别进行处理。