...
DataGroupMemeber持有FilePartitionedSnapshotManager
继承自PartitionedSnapshotLogManager
takeSnapshot方法来做快照,存储到FilePartitionedSnapshotLogManager中,只对当前DataGroupMember对应的slot制作快照
设置BlockAppliedCommitIndex来阻塞log的apply
调用PartitionedSnapshotManager来等待阻塞前的所有日志全部完成,有超时时间
如果commitIndex 或 blockAppliedCommitIndex <= 0,则直接返回
等待blockAppliedCommitIndex前面的所有日志都已经apply。
同步所有的Processor,执行flush操作
持有当前类锁
收集对应slot的全部的TimeseriesSchema
设置snapshotLastIndex和snapshotLastTerm(依据BlockAppliedcommitIndex)
收集所有的Ts文件,并且填充所有的时间序列
收集全部的TsFile
清空Map<Integer, T> slotSnapshot。
获取所有的根据SG和PartitionNumber划分的TsFile
开始为每一个SG中的每一个PartitionNumber的TsFile们创建硬链接,并通过addFile方法将资源添加到FileSnapshot中。如果创建失败(在创建过程中有文件被删除),则移除所有硬链接,并且重新开始收集全部的TsFile
收集对应的TimeseriesSchema
获取的目前的DataGroupMember中相关的全部slot
将DataGroupMember所拥有的slot对应的timeseriesSchema全部注册到FileSnapshot中。如果raft group的header node被移除,导致不能获得所拥有的的slot,则全部设置
调用resetBlockAppliedCommitIndex来结束阻塞。
getSnapshot方法用来获取快照:直接调用父类PartitionedSnapshotLogManager的对应方法
首先对slotSnapshots这个映射加锁
然后根据slotSnapshots制作PartitionedSnapshot
然后调用truncateBefore方法,向下调用,移除FileSnapshot中需要被移除的数据文件(同时删除硬链接)
CatchUp的流程
在HeartBeatHandler中的handleNormalHeartbeatResponse方法,如果发现当前的log的index和term不是最新的,且当前的LastLogIndex和上一次HeartBeat时的LastLogIndex相比没有发生变化的次数大于等于5次,则调用RaftMember中的catchUp方法
synchronized catchUpService
从lastCatchUpResponseTime中检查上一次该node的catchup时间
如果上一次catchup时间不为空,并且当前时间到上一次catchup的时间小于CatchUpTimeOutMS则仍然在进行上一次catchup,直接返回。否则,记录当前catchup的开始时间。
synchronized catchUpService 结束
如果CatchUpService是正常运行状态,则新建CatchUpTask,并提交异步执行。
具体的CatchUpTask的执行(run方法)
在HeartBeatHandler中的handleNormalHeartbeatResponse方法,如果发现当前的follower不是最新的,且当前的LastLogIndex和上一次HeartBeat时的LastLogIndex一致连续大于等于5次,则调用RaftMember中的catchUp方法
首先对catchUpService加锁,从lastCatchUpResponseTime中检查上一次该node的catchup时间,如果上一次catchup时间不为空,并且当前时间到上一次catchup的时间小于CatchUpTimeOutMS则仍然在进行上一次catchup,直接返回。否则,记录当前catchup的开始时间。完成后释放对catchUpService添加的锁。
如果CatchUpService没有关闭,则新建CatchUpTask,并提交异步执行。
调用CatchUpTask的run()方法
首先调用checkMatchIndex()方法,检查是否需要使用snapshot进行catch up,并将结果设置到findMatchedIndex
如果返回true,则直接使用log来完成catch 如果返回值为true,则直接使用log来完成catch up
如果返回false,则需要使用snapshot来完成snapshot如果返回值为false,则需要使用snapshot来完成catchup
检查abort的值
值为true,则当前节点正在进行self-catching-up,重置并返回。
值为false,则继续执行。
根据findMatchedIndex的值,执行catch 根据findMatchedIndex的值,执行具体的catch up,并将结果设置到catchUpSucceeded。
如果值为true,则实例化LogCatchUpTask,并且调用call方法进行执行。如果值为true,则实例化LogCatchUpTask,并且调用call方法进行执行,将对应的Log添加,等待执行。
如果值为false
则首先执行doSnapshot方法
触发当前raftMember的LogMember的takeSnapshot方法触发当前raftMember的LogMember的getSnapshot方法,并将结果存储到CatchUpTask中
对应DataGroupMember,调用takeSnapshotForSpecificSlots方法
阻塞新到达的日志被apply
等待阻塞的日志之前到达的所有日志都被apply
调用flush,将数据回盘写入。
synchronized FilePartitionedSnapshotLogManager(被DataGroupMember持有)
收集时间序列元数据。
收集对应的TsFile文件,并且填充时间序列元数据:其中如果在收集TsFile文件的过程中遇到文件销毁,会重做开始收集。
synchronized FilePartitionedSnapshotLogManager释放
对应MetaGroupMember
等待阻塞的日志之前到达的日志都被apply
synchronized MetaSingleSnapshotLogManager
收集SG的TTL时间等
将当前的元数据存储到LogManager
synchronized MetaSingleSnapshotLogManager释放
触发当前raftMember的LogManager的getSnapshot方法,并将结果存储到CatchUpTask中
对应DataGroupMember的LogManager
synchronized SlotSnapshots
新建PartitionedSnapshot
将当前SlotSnapshots中的snapshot们装载进入新建的PartitionedSnapshot
调用truncateBefore来决定是否需要移除远程资源。
synchronized SlotSnapshots 释放
对应MetaGroupMember的LogManager,则直接根据刚刚存储的元数据写入到Snapshot
然后调用removeSnapshotLogs方法:移除Snapshot中包含的Logs
最后实例化SnapshotCatchUpTask,并调用call方法
首先调用doSnapshotCatchUp方法,生成SendSnapshotRequest,添加header、snapshot,通过客户端进行发送,并将结果设置到abort中
对应的RaftMember会通过receiveSnapshot方法接受到发送的SendSnapshotRequest请求对应的RaftMember会通过receiveSnapshot方法接收到发送的SendSnapshotRequest请求
然后从请求中发序列化得到snapshot然后从请求中反序列化,得到请求体中的snapshot
调用snapshot的getDefaultInstaller来install对应的snapshot
metaSnapshot的install过程,slot为-1,无影响PartitionedSnapshot的install过程,slot为-1,无影响,会递归调用对应的FileSnapshot,此时slot为对应的数据位置。
synchronized SnapshotApplyLock
设置存储组的数据、TTL等snapshot中包含的数据
synchronized MetaGroupMember 的 LogManager
apply snapshot
synchronized MetaGroupMember 的 LogManager 释放
synchronized SnapshotApplyLock 释放
PartitionedSnapshot的install过程,slot为-1,无影响,会递归调用对应的FileSnapshot,此时slot为对应的数据位置,FileSnapshot的install过程如下
install snapshot中的时间序列元数据
如果isDataMigration = true,则检查slot的状态,并且通过slotManager将对应的slot装填设置为pulling
install snapshot中的TsFile,其中loadNewTsFile的过程中,会持有对应写锁。
释放对应的slot的状态
如果abort不为true,则继续调用doLogCatchUp()方法,生成AppendEntryRequest,添加header、leader、leaderCommit等信息,然后将logs逐个处理
synchronized RaftMember的term
如果当前节点不是Leader,则直接报错。
如果当前节点是leader,则将term设置到request中
synchronized RaftMember的term 释放锁住raftMember的term,如果当前节点不是leader,则直接报异常,如果是leader,则设置term进入request。
设置PrevLogIndex、PrevLogTerm进入request。
使用client发送appendEntry。
如果catchUpSucceeded为true,则执行后续操作
如果logs不为空,或者snapshot不为空,则设置peer的matchIndex。
调用peer的resetInconsistentHeartbeatNum来重置。
从LastCatchUpResponseTime中移除当前节点的catch up开始时间,来允许下一次catchup