RocketMQ5.0源码解析-CommitLog图文详解

导读:本篇文章讲解 RocketMQ5.0源码解析-CommitLog图文详解,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

1. CommitLog示意图

CommitLog是对RocketMQ的存储的抽象,示意图如下:

RocketMQ5.0源码解析-CommitLog图文详解

CommitLog 主要由几部分组成:

  • MappedFileQueue: 主要用来操作相关数据存储文件。将一系列的MappedFile抽象成一个队列。
  • FlushManager: 数据落地磁盘的管理,主要分为两类:实时数据刷盘(FlushRealTimeService),以及异步刷盘(GroupCommitService)
  • FlushDiskWatcher: 刷盘观察者,处理队列中的刷盘请求,对于规定时间内没有刷盘成功的进行处理。

Tips: 多目录存储参照RIP-7

CommitLog 底层主要是通过FileChannel来实现。其中还有一些RocketMQ的自身优化,例如: TransientStorePool。

2. MappedFileQueue

MappedFileQueue 是对数据存储文件的一个抽象,将多个数据文件抽象成为一个文件队列。通过这个文件队列对文件进行操作操作。同时保存一些 CommitLog 的属性。来看一下MappedFileQueue中的几个重要的属性:

  • storePath: 数据文件存储的位置
  • mappedFileSize: 单个数据文件的大小
  • mappedFiles: 数据文件列表
  • allocateMappedFileService: 分配文件服务
  • flushedWhere: 当前刷盘指针,表示该指针之前所有的数据全部持久化到了硬盘上面
  • committedWhere :当前数据提交指针,内存中byteBuffer当前的写指针,该值大于等于flushedWhere

Tips: 同步刷盘的过程中flushedWhere等于committedWhere

MappedFileQueue 同时提供了一些操作例如:

  • 刷新文件(更新flushedWhere) MappedFileQueue#flush
  • 提交文件(更新committedWhere)MappedFileQueue#commit
  • 以及一些文件的操作,获取最新文件和第一个文件等等。

2.1 MappedFile

MappedFile 是对文件的抽象,包含了对RocketMQ数据文件的整个操作。例如获取文件名称、文件大小、判断文件是否可用、是否已经满了等等的操作。

单个数据文件默认是 1G 。然后按照顺序存储Producer发送的消息。

数据文件就是由上图所示的一条条的数据组成。通过观察你可能会发现存储Topic的长度只占用了一个字节。这个也是在使用RocketMQ需要注意的一点就是:

由于只用了一个字节保存Topic的长度所以Topic的最大长度是127字符,如果使用中文长度会更短。 在RocketMQ5.0的版本中增加 PutMessageHook 会在Put Message之前去Check一些必要的数据和参数。例如Topic的长度就其中之一。具体可以参照 HookUtils 工具类。

3. CommitLog消息处理流程

RocketMQ5.0源码解析-CommitLog图文详解

3.1 消息校验

生产者发送消息到 Broker Master 后,在存入 CommitLog 之前首先会经过一个 PutMessageHook 的处理接口类,这个有不同的实现,主要用于校验一些消息的数据如:Message Topic 大小、Message Body大小、以及MessageStore的一些状态等等。

Tips: 通过实现PutMessageHook可以做到更多校验以及一些其他的事情。

3.2 消息处理

CommitLog#asyncPutMessage 在CommitLog有两个分别用来处理单个消息和批量消息。下面以单个消息处理为例

  • 消息的一些属性设置

    消息体的CRC、设置消息生产者的IP地址是否为IPV6、设置存储Broker IP地址是否为IPV6

  • HA处理设置

    根据服务的设置判断是否需要处理HA. RocketMQ5.0 HA有两种模式:

    • Controller Model模式: DLedger模式的进阶版本,对原有的DLedger模式进行优化
    • SlaveActingMaster模式

    这两个模式主要目的是根据返回ACK的Slave数量来判断消息是否同步到Slave成功

  • Append Message 到 MappedByteBuffer

  • 处理Append的Result

  • 根据Broker的配置进行刷盘

    刷盘是根据在启动Broker的时候配置的刷盘模式进行不同的刷盘策略。

    • SYNC_FLUSH: GroupCommitService线程进行实时刷盘
    • ASYNC_FLUSH: FlushRealTimeService线程进行异步刷盘
  • 处理HA

    HA的处理不是必须的,这个看RocketMQ是否配置了HA模式。只有配置了才需要进行处理。

4. 刷盘解析

刷盘主要有两种模式:同步刷盘和异步刷盘。刷盘主要由 FlushManager 进行管理。刷盘接口关系:

RocketMQ5.0源码解析-CommitLog图文详解

4.1 同步刷盘解析

同步刷盘是由 GroupCommitService 来处理

在刷盘过程中使用到的两个类:GroupCommitServiceFlushDiskWatcher 从本质上看都是一个Thread。

GroupCommitService处理主要分为三步:

  1. 往链表中写入GroupCommitRequest请求

    RocketMQ5.0源码解析-CommitLog图文详解

  2. GroupCommitService执行doCommit方法

    RocketMQ5.0源码解析-CommitLog图文详解

  3. 执行刷盘操作,将结果写入GroupCommitRequest

到这里就已经基本上完成整个同步的刷盘步骤。细心的可能会发现还有一个 FlushDiskWatcher 这个类。它的功能主要是:处理在规定时间内还没有刷盘成功的请求。

4.2 异步刷盘解析

异步刷盘的服务是**FlushRealTimeService** ,不过当内存缓存池 TransientStorePool 可用时,消息会先提交到TransientStorePool中的WriteBuffer内部,再提交到MappedFile的FileChannel中,此时异步刷盘服务就是CommitRealTimeService。下面看一下 FlushRealTimeService

RocketMQ5.0源码解析-CommitLog图文详解

RocketMQ5.0源码解析-CommitLog图文详解

而在启用了暂存池的情况下使用的是 CommitRealTimeService 进行刷盘:

5. 总结

  • CommitLog在RocketMQ中充当一个存储的抽象,所有的存储操作都是通过CommitLog对外暴露。底层包含了很多其他组件来支持
  • 刷盘的模式可以通过配置文件的 flushDiskType 字段来配置,SYNC_FLUSH表示同步刷盘、ASYNC_FLUSH表示异步刷盘
  • 刷盘的服务主要是由三个服务:FlushRealTimeService、GroupCommitService、CommitRealTimeService来实现刷盘,以及FlushDiskWatcher来处理一些特殊的刷盘情况。这些服务本质上都是线程。

 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/94738.html

(0)
小半的头像小半

相关推荐

极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!