Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

(2) 阶段1目标 —— 接口化:区分【文件句柄TsFileResource】【文件索引FileIndexEntries】【文件索引TsFileIndex】,将TsFileResource开始时间、结束时间等字段移至FileIndex中;字段移至TsFileIndex中。


3. 实现方案:

方案1(经尝试后,否决):利用利用FileIndex的接口完成文件索引相关功能;TsFileResource中仅保留文件句柄的功能。

二、阶段1详细设计:新增和修改的数据结构

具体而言,涉及到数据结构如下:【画图】

1. FileIndexEntries:(用于信息传递)

...

private TimeIndexEntry[] indexEntries; // TimeIndexEntry 列表,记录文件中所有的前缀路径、开始时间和结束时间

private String tsFilePath; // TsFile 路径

2. TimeIndexEntry:

...

private PartialPath path; // 前缀路径

private long startTime; // 开始时间

private long endTime; // 结束时间

3. FileIndex

// 获得与查询相关的FileIndexEntries

Map<tsFilePath, TimeIndexEntry[]> filterByPath(PartialPath path, Filter timeFilter) 

// 为路径增加索引,适用场景:(1)文件刷盘封口;(2)版本升级

void createIndexForFiles(FileIndexEntries fileIndexEntries) 

// 为路径删除索引,适用场景:(1)删除文件;(2)根据TTL标记失效的文件

void deleteIndexOfFiles(FileIndexEntries fileIndexEntries) 

使用 FileIndexManager 管理 FileIndex,使用 FileIndexEntries 和 TimeIndexEntry 进行消息传递。

缺点:(1) 设计新文件结构多,模块变动大,修改代码过多,后续难以维护;

(2) 增加新的落盘文件,文件目录庞大;较大地影响分布式部分的代码;

(3) merge 操作和查询操作涉及到 FileIndex 与 TsFileResource的互相调用和转化。


方案2:TsFileIndex 仅作为 TsFileResource 中的一个对象,全部接口均由 TsFileResource 调用,不向外暴露;

TsFileResource 的已有接口不变。后续“实现”的全部修改仅针对 TsFileIndex。

Image Added

二、阶段1详细设计:新增和修改的数据结构

具体而言,涉及到数据结构如下:

1. TsFileIndex (后续“实现”的全部修改仅针对 TsFileIndex)

protected long[] startTimes; //  开始时间列表

protected long[] endTimes; // 结束时间列表,未封口则为 Long.MIN_VALUE

protected Map<String, Integer> deviceToIndex; // device设备名 => 开始 / 结束时间列表index


2

4. FileIndexManager

public class FileIndexManager {

    private Map<PartialPath, FileIndex> seqIndices;

    private Map<PartialPath, FileIndex> unseqIndices;

    private final ReentrantReadWriteLock lock;

    private static class FileIndexManagerHolder {

        private FileIndexManagerHolder() {
            // allowed to do nothing
        }

        private static final FileIndexManager INSTANCE = new FileIndexManager();
    }

    public static FileIndexManager getInstance() {
        return IndexerManagerHolder.INSTANCE;
    }

    private FileIndexManager() {
        seqIndexers = new ConcurrentHashMap<>();
        unseqIndexers = new ConcurrentHashMap<>();
        lock = new ReentrantReadWriteLock();
    }

    public boolean init() {...}

    public void addSeqIndexer(PartialPath storageGroup, FileIndex fileTimeIndexer) { ... }

    public void addUnseqIndexer(PartialPath storageGroup, FileIndex fileTimeIndexer) { ... }

    public void deleteSeqIndexer(PartialPath storageGroup) { ... }

    public void deleteUnseqIndexer(PartialPath storageGroup) { ... }

    public FileIndex getSeqIndexer(PartialPath storageGroup) { ... }

    public FileIndex getUnseqIndexer(PartialPath storageGroup) { ... }

    // 将TsFileResource转化为FileIndexEntries,用于版本升级

    public static FileIndexEntries convertFromTsFileResource(TsFileResource resource) {... }

}

4. 第二步的修改(后续)涉及到【封口文件句柄SealedTsfileResource】和【未封口文件句柄UnsealedTsfileResource

因为封口文件句柄需要的字段较少,未封口文件句柄可以继承封口文件句柄;文件封口时再转化为封口文件句柄。


53. 总结

(1) 第一步接口化(本次)的修改中,可以从TsFileResource中转移走的字段:

...

Map<String, String> cachedDevicePool // for reducing the String number in memory

File file // tsfile

TsFileProcessor processor 【未封口】

ModificationFile modFile // modification file

boolean closed【未封口】

boolean deleted【未封口】

boolean isMerging

Set<Long> historicalVersions // for tracking the merge history of a TsFile

TsFileLock tsFileLock

List<ChunkMetadata> chunkMetadataList // 【未封口】chunk metadata list of unsealed tsfile. Only be set in a temporal TsFileResource in a query process.

List<ReadOnlyMemChunk> readOnlyMemChunk // 【未封口】mem chunk data. Only be set in a temporal TsFileResource in a query process.

TimeseriesMetadata timeSeriesMetadata // 【未封口】get TimeseriesMetadata of unsealed file

List<TsFileResource> upgradedResources // 【升级】generated upgraded TsFile ResourceList used for upgrading v0.9.x/v1 -> 0.10/v2

UpgradeTsFileResourceCallBack upgradeTsFileResourceCallBack // 【升级】load upgraded TsFile Resources to storage group processor used for upgrading v0.9.x/v1 → 0.10/v2

boolean isSeq // 【升级】indicate if this tsfile resource belongs to a sequence tsfile or not used for upgrading v0.9.x/v1 -> 0.10/v2

TsFileResource originTsFileResource // 【未封口】current tsfile resource is a snapshot of the originTsFileResource. When we want to used the lock, we should try to acquire the lock of originTsFileResource

long maxPlanIndex // for cluster, max index of plans executed within this TsFile

long minPlanIndex // for cluster, min index of plans executed within this TsFile

三、涉及到使用文件索引的操作

【数据的插入和批量插入】

总入口: public void insert(InsertRowPlan insertRowPlan) StorageEngine.java

1. 找到对应的 StorageGroupProcessor

2. 根据写入数据的时间以及当前设备落盘的最后时间戳,找到对应的 TsFileProcessor

3. 【需修改】写入 TsFileProcessor 对应的 memtable 中:

(1) 如果是乱序文件,则更新tsfileResource中的endTimeMap (deviceToIndex, endTimes)

(2) 如果tsfile中没有该设备的信息,或新插入数据的时间小于已存startTime,则更新tsfileResource中的startTimeMap

4. 记录写前日志

5. 根据 memtable 大小,来判断是否触发异步持久化 memtable 操作。如果是顺序文件且执行了刷盘动作,则更新tsfileResource中的endTimeMap

6. 根据当前磁盘 TsFile 的大小,判断是否触发文件关闭操作

修改:第3步将endTimeMap、startTimeMap存储在TsFileProcessor中,而不是tsfileResource中

【数据的访问】

总入口StorageEngine: public QueryDataSource query(SingleSeriesExpression seriesExpression, QueryContext context, QueryFileManager filePathsManager)

...

修改:调用filterByPath(path, timeFilter) 方法

封口的文件:从Manager得到storage group分区的FileIndex,List<Pair<TsFileResource, TimeIndexEntry[]>>

未封口文件:

TsFileResource记录整体的开始截止时间,用于merge。

List<Pair<TimeIndexEntry, TsFileResource>>

根据path, 进入结构,根据 timeFilter 时间二分找到

【重启恢复流程】

StorageGroupProcessorrecover()

1. 修改恢复每个分区的顺序 / 乱序文件,将上一步获得的每个分区的顺序 / 乱序 TsFile 文件作为参数,调用recoverTsFiles进行恢复,

该方法会将恢复后的顺序 / 乱序 TsFile 以 TsFileResource 的形式放入TsFileManagement里

若该 TsFile 是此分区的最后一个,且未封口,则还要为其构造TsFileProcessor对象,并加入work(Un)sequenceTsFileProcessors中

2. 若该 TsFile 文件不是最后一个文件,或者该 TsFile 文件是最后一个文件,但已经被关闭或标记被关闭,

只需将该 TsFile 文件在内存中对应的TsFileResource(文件句柄)的closed属性置成true即可。

3. 若该 TsFile 文件可以继续写入,则表示这是此分区的最后一个 TsFile,且未封口,则继续保持其未封口的状态,

需要为它构造一个TsFileProcessor对象,并将其放到workSequenceTsFileProcessors或workUnsequenceTsFileProcessors中。

4. 最后将恢复出来的 TsFile 文件在内存中对应的TsFileResource对象放入sequenceFileTreeSet或unSequenceFileList中,用于更新分区对应的版本号等

修改:第1步中,需要同时恢复文件时间索引信息(将TimeFileIndex信息放入FileIndexManager中)

四、涉及到使用文件句柄类的操作

【数据的删除】

StorageEngine.java中的入口: public void delete(String deviceId, String measurementId, long timestamp)

1. 找到对应的 StorageGroupProcessort

2. 找到受影响的所有 working TsFileProcessor 记录写前日志

3. 【保留使用文件句柄类】找到受影响的所有 TsFileResource,在其对应的 mods 文件中记录一条记录:pathversionstartTimeendTime

(1) 如果存在 working memtable:则删除内存中的数据

...