您当前的位置:首页 >  品牌资讯 >> 
【世界时快讯】RocketMq5.0 任意延迟时间 TimerMessageStore 源码解析

时间:2023-07-06 15:25:34    来源 : 博客园


(相关资料图)

TimerMessageStore 简略介绍延迟队列 rmq_sys_wheel_timer指定时间的延迟消息。会先投递到 rmq_sys_wheel_timer队列中然后由 TimerMessageStore消费队列数据,将数据消费到 timerWheel使用时间轮算法,实现秒级任务TimerMessageStore 操作的文件store\consumequeue\rmq_sys_wheel_timer从队列中读取消息, 提取数据存到 timerlogtimerwheelstore\checkpoint对应 TimerMessageStore#timerCheckpointlastReadTimeMs上次消费的时间节点lastTimerLogFlushPos最后刷新 log的 poslastTimerQueueOffset最后一次消费的队列节点masterTimerQueueOffset主 Broker 的队列消费节点store\timerwheel时间轮,内由 Slot组成 结构如下timeMs消息到达时间firstPos开始的 poslastPos结束的 pos 在 timerLog 中读取数据, 后面会讲具体逻辑num消息数量magicno use now, just keep itstore\timerlog对应 TimerMessageStore#timerCheckpoint里边也是由多个 mappedFile组成。主要是存储原msg的数据,因为从 rmq_sys_wheel_timer消费了之后,会存到 timerwheeltimerlog中TimerMessageStore 启动enqueueGetService.start();enqueuePutService.start();dequeueWarmService.start();dequeueGetService.start();timerFlushService.start();dequeueGetMessageServices[getThreadNum].start();dequeuePutMessageServices[getThreadNum].start();深入 TimerMessageStore 之 TimerEnqueueGetServiceTimerMessageStore.this.enqueue默认 100毫秒执行一次从 消息队列 rmq_sys_wheel_timer消费数据 ps: currQueueOffsetcheckpoint读取出来的将消费出来的数据, 封装成 TimerRequest 投入到 enqueuePutQueuecurrQueueOffset + 1进入下一个循环 消费下一个 offset 节点深入 TimerMessageStore 之 TimerEnqueuePutService消费 enqueuePutQueue中的数据shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs检查消费的消息是否已到达投递时间。到达时间。投递到 dequeuePutQueue.put(req);中消息未到达时间 doEnqueue->timerWheel.getSlot(delayedTime)获取延迟时间插槽。构建 ByteBuffer投入 timerLog中数据结构为:|消息大小|前一个节点的pos|magic|log写入时间|延迟时间|offsetPy|sizePy|realTopic|0timerLog.append返回插入位置 ret构建 timerWheel|消息到达时间戳|firstPos|ret (timerLog.append返回位置)| 消息数量| 0|深入 TimerMessageStore 之 TimerDequeueGetService消费 timerWheel中的数据根据 currReadTimeMs来获取 timerWheel插槽数据currReadTimeMs初始化的时候 timerCheckpoint.getLastReadTimeMs()读取的是上次最后消费的数据假设broker 宕机了一段时间。那么 currReadTimeMs会按照上一次宕机的时间开始搜寻数据, 这样子宕机消息也不会丢失。会在启动的那段时间被投递出去currReadTimeMsmoveReadTime方法中会自增timerWheel.getSlot(currReadTimeMs);读取插槽数据long currOffsetPy = slot.lastPos;读取插槽属性, 最后一个pos节点timerLog.getWholeBuffer(currOffsetPy)根据 currOffsetPy获取 SelectMappedBufferResulttimerLogSelectMappedBufferResult中获取数据。prevPos上一个节点数据enqueueTime放入 timerLog 的时间delayedTime消息到达时间戳offsetPycommitLog的数据位置sizePycommitLog的数据大小构建 TimerRequest讲消息投递到 dequeueGetQueuecurrOffsetPy = prevPos将位置移动到前一个,进行遍历深入 TimerMessageStore 之 TimerDequeueGetMessageService默认有三个 TimerDequeueGetMessageService实例同时消费 dequeueGetQueuegetMessageByCommitOffsetcommitLog中读取原投递的消息数据读取 uniqkey判断不在 deleteList中的时候 将消息投递到 dequeuePutQueue中去深入 TimerMessageStore 之 TimerDequeuePutMessageService默认有三个 TimerDequeuePutMessageService实例同时消费 dequeuePutQueueconvert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));将消息转换成原始的 topic 消息,清除无用属性doPut-> messageStore.putMessage(message)将消息投递到指定 messageQueue中TimerFlushServicetimerLog刷盘timerWheel刷盘timerCheckpoint刷盘TimerMessageStore 初始化加载源码timerLog.load()加载文件timerMetrics.load加载文件recover->recoverAndRevise(lastFlushPos, true)ps: (用于 timerWheltimerLog的数据保持一致刷新)lastFlushPos最后一次刷盘的位置, 其实最终是拿到 timerlog -> mappedFile的第几个文件遍历这个 mappedFile的数据timerWheel.reviseSlot修改插槽数据。 检查这个时间的插槽是否已经有填充数据。如果有的话,刷新 lastPos(顺序遍历。这里最终还是会是最后一个 lastPos)如果不存在插槽数据 则插入插槽数据 putSlotreviseQueueOffset(processOffset);读取 timerLog最后一个数据, 为了校验最后一个数据是否正常,是否能读取到消息。确认 currQueueOffset数据确认 currReadTimeMs数据

标签:

最新发布

热门推荐

X 关闭

X 关闭