Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. DataGroupMemeber持有FilePartitionedSnapshotManager

  2. 继承自PartitionedSnapshotLogManager

  3. takeSnapshot方法来做快照,存储到FilePartitionedSnapshotLogManager中,只对当前DataGroupMember对应的slot制作快照

    1. 设置BlockAppliedCommitIndex来阻塞log的apply

    2. 调用PartitionedSnapshotManager来等待阻塞前的所有日志全部完成,有超时时间

      1. 如果commitIndex 或 blockAppliedCommitIndex <= 0,则直接返回

      2. 等待blockAppliedCommitIndex前面的所有日志都已经apply。

    3. 同步所有的Processor,执行flush操作

    4. 持有当前类锁

      1. 收集对应slot的全部的TimeseriesSchema

      2. 设置snapshotLastIndex和snapshotLastTerm(依据BlockAppliedcommitIndex)

      3. 收集所有的Ts文件,并且填充所有的时间序列

        1. 收集全部的TsFile

          1. 清空Map<Integer, T> slotSnapshot。

          2. 获取所有的根据SG和PartitionNumber划分的TsFile

          3. 开始为每一个SG中的每一个PartitionNumber的TsFile们创建硬链接,并通过addFile方法将资源添加到FileSnapshot中。如果创建失败(在创建过程中有文件被删除),则移除所有硬链接,并且重新开始收集全部的TsFile

        2. 收集对应的TimeseriesSchema

          1. 获取的目前的DataGroupMember中相关的全部slot

          2. 将DataGroupMember所拥有的slot对应的timeseriesSchema全部注册到FileSnapshot中。如果raft group的header node被移除,导致不能获得所拥有的的slot,则全部设置

    5. 调用resetBlockAppliedCommitIndex来结束阻塞。

  4. getSnapshot方法用来获取快照:直接调用父类PartitionedSnapshotLogManager的对应方法

    1. 首先对slotSnapshots这个映射加锁

    2. 然后根据slotSnapshots制作PartitionedSnapshot

    3. 然后调用truncateBefore方法,向下调用,移除FileSnapshot中需要被移除的数据文件(同时删除硬链接)


CatchUp的流程

  1. 在HeartBeatHandler中的handleNormalHeartbeatResponse方法,如果发现当前的log的index和term不是最新的,且当前的LastLogIndex和上一次HeartBeat时的LastLogIndex相比没有发生变化的次数大于等于5次,则调用RaftMember中的catchUp方法

    1. synchronized catchUpService

      1. 从lastCatchUpResponseTime中检查上一次该node的catchup时间

      2. 如果上一次catchup时间不为空,并且当前时间到上一次catchup的时间小于CatchUpTimeOutMS则仍然在进行上一次catchup,直接返回。否则,记录当前catchup的开始时间。

    2. synchronized catchUpService 结束

  2. 如果CatchUpService是正常运行状态,则新建CatchUpTask,并提交异步执行。

  3. 具体的CatchUpTask的执行(run方法)

  4. 在HeartBeatHandler中的handleNormalHeartbeatResponse方法,如果发现当前的follower不是最新的,且当前的LastLogIndex和上一次HeartBeat时的LastLogIndex一致连续大于等于5次,则调用RaftMember中的catchUp方法

  5. 首先对catchUpService加锁,从lastCatchUpResponseTime中检查上一次该node的catchup时间,如果上一次catchup时间不为空,并且当前时间到上一次catchup的时间小于CatchUpTimeOutMS则仍然在进行上一次catchup,直接返回。否则,记录当前catchup的开始时间。完成后释放对catchUpService添加的锁。

  6. 如果CatchUpService没有关闭,则新建CatchUpTask,并提交异步执行。

  7. 调用CatchUpTask的run()方法

    1. 首先调用checkMatchIndex()方法,检查是否需要使用snapshot进行catch up,并将结果设置到findMatchedIndex

      1. 如果返回true,则直接使用log来完成catch 如果返回值为true,则直接使用log来完成catch up

      2. 如果返回false,则需要使用snapshot来完成snapshot如果返回值为false,则需要使用snapshot来完成catchup

    2. 检查abort的值

      1. 值为true,则当前节点正在进行self-catching-up,重置并返回。

      2. 值为false,则继续执行。

    3. 根据findMatchedIndex的值,执行catch 根据findMatchedIndex的值,执行具体的catch up,并将结果设置到catchUpSucceeded。

      1. 如果值为true,则实例化LogCatchUpTask,并且调用call方法进行执行。如果值为true,则实例化LogCatchUpTask,并且调用call方法进行执行,将对应的Log添加,等待执行。

      2. 如果值为false

        1. 则首先执行doSnapshot方法

          1. 触发当前raftMember的LogMember的takeSnapshot方法触发当前raftMember的LogMember的getSnapshot方法,并将结果存储到CatchUpTask中

            1. 对应DataGroupMember,调用takeSnapshotForSpecificSlots方法

              1. 阻塞新到达的日志被apply

              2. 等待阻塞的日志之前到达的所有日志都被apply

              3. 调用flush,将数据回盘写入。

              4. synchronized FilePartitionedSnapshotLogManager(被DataGroupMember持有)

                1. 收集时间序列元数据。

                2. 收集对应的TsFile文件,并且填充时间序列元数据:其中如果在收集TsFile文件的过程中遇到文件销毁,会重做开始收集

              5. synchronized FilePartitionedSnapshotLogManager释放

            2. 对应MetaGroupMember

              1. 等待阻塞的日志之前到达的日志都被apply

              2. synchronized MetaSingleSnapshotLogManager

                1. 收集SG的TTL时间等

                2. 将当前的元数据存储到LogManager

              3. synchronized MetaSingleSnapshotLogManager释放

          2. 触发当前raftMember的LogManager的getSnapshot方法,并将结果存储到CatchUpTask中

            1. 对应DataGroupMember的LogManager

              1. synchronized SlotSnapshots

                1. 新建PartitionedSnapshot

                2. 将当前SlotSnapshots中的snapshot们装载进入新建的PartitionedSnapshot

                3. 调用truncateBefore来决定是否需要移除远程资源。

              2. synchronized SlotSnapshots 释放

            2. 对应MetaGroupMember的LogManager,则直接根据刚刚存储的元数据写入到Snapshot

        2. 然后调用removeSnapshotLogs方法:移除Snapshot中包含的Logs

        3. 最后实例化SnapshotCatchUpTask,并调用call方法

          1. 首先调用doSnapshotCatchUp方法,生成SendSnapshotRequest,添加header、snapshot,通过客户端进行发送,并将结果设置到abort中

            1. 对应的RaftMember会通过receiveSnapshot方法接受到发送的SendSnapshotRequest请求对应的RaftMember会通过receiveSnapshot方法接收到发送的SendSnapshotRequest请求

            2. 然后从请求中发序列化得到snapshot然后从请求中反序列化,得到请求体中的snapshot

            3. 调用snapshot的getDefaultInstaller来install对应的snapshot

              1. metaSnapshot的install过程,slot为-1,无影响PartitionedSnapshot的install过程,slot为-1,无影响,会递归调用对应的FileSnapshot,此时slot为对应的数据位置。

                1. synchronized SnapshotApplyLock

                2. 设置存储组的数据、TTL等snapshot中包含的数据

                3. synchronized MetaGroupMember 的 LogManager

                4. apply snapshot

                5. synchronized MetaGroupMember 的 LogManager 释放

                6. synchronized SnapshotApplyLock 释放

              2. PartitionedSnapshot的install过程,slot为-1,无影响,会递归调用对应的FileSnapshot,此时slot为对应的数据位置,FileSnapshot的install过程如下

                1. install snapshot中的时间序列元数据

                2. 如果isDataMigration = true,则检查slot的状态,并且通过slotManager将对应的slot装填设置为pulling

                3. install snapshot中的TsFile,其中loadNewTsFile的过程中,会持有对应写锁

                4. 释放对应的slot的状态

          2. 如果abort不为true,则继续调用doLogCatchUp()方法,生成AppendEntryRequest,添加header、leader、leaderCommit等信息,然后将logs逐个处理

            1. synchronized RaftMember的term

              1. 如果当前节点不是Leader,则直接报错。

              2. 如果当前节点是leader,则将term设置到request中

            2. synchronized RaftMember的term 释放锁住raftMember的term,如果当前节点不是leader,则直接报异常,如果是leader,则设置term进入request。

            3. 设置PrevLogIndex、PrevLogTerm进入request。

            4. 使用client发送appendEntry。

    4. 如果catchUpSucceeded为true,则执行后续操作

      1. 如果logs不为空,或者snapshot不为空,则设置peer的matchIndex。

      2. 调用peer的resetInconsistentHeartbeatNum来重置。

    5. 从LastCatchUpResponseTime中移除当前节点的catch up开始时间,来允许下一次catchup