...
如果在待合并列表中这个sensor对应的所有chunk都已经达到了这个阈值,则不再合并chunk,直接将读出来的chunk写入新文件
跨文件空间合并(将乱序文件合并至顺序空间)
...
跨文件空间合并(将乱序文件合并至顺序空间)(针对一元时间序列)
FullMerge(完全合并)
出于IO和合并性能考虑,乱序合并不会重写整个顺序文件,获取一个整理完全的顺序文件,而是会将与乱序文件重叠的 chunk 合并重写后,追加到原顺序文件后面,不删除原chunk,称这个过程为 原地合并
...
重新设计整个合并流程,舍弃 原地合并 只保留 完全合并
外部并行
执行拆分流程:device->seqFile→sensor
文件选择
任务执行
输入一组乱序文件U,以及一组顺序文件S,最大同时合并的时间序列数量为n,单个chunk的点数阈值tpt,是否进行Full Merge
...
7.1. 开启一个新的ChunkGroup;
7.2. 对于Tc中的每一条时间序列tsi,在S上查询它们的Chunk,并将这些Chunk按照在文件中的位置排序。
7.3. 取出位置最小的Chunk,记做c,如果已经没有Chunk,转7.4
7.3.1 找到c对应的MergeReader ri,如果ri当前的数据点的时间小于等于c的结束时间tend,将ri所有时间不大于tend的点和c合并,并将c写入到wi,转7.3.5;
7.3.2 如果c的点数小于tpt或者c的上一个Chunk已经写入到wi,但是wi还没有进行flush,将c写入到wi,转7.3.5;
7.3.3 如果进行Full Merge,将c写入到wi,转7.3.5;
7.3.4 记录这个没有被merge的Chunk,转7.3;
7.3.5 如果wi中已写入的点大于等于tpt,将wi flush到s'j,转7.3;
7.4 关闭这个ChunkGroup;
...
执行拆分流程:device→seqFile→sensor
输入:待合并文件列表 SeqTsFileList, UnseqTsFileList
- deviceSet=[]
- writer = 目标文件 RestorableTsFileIOWriter
- for tsfile in TsFileList
- 从 FileReaderManager 中获取此 tsfile 的 TsFileSequenceReader,并读出该文件的设备列表,添加至 deviceSet
- for device in deviceSet
- metadataIteratorList = []
- for tsfile in TsFileList
- 读取 tsfile 中 device 对应的 ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
- 将 ChunkMetadataListIterator 添加到 metadataIteratorList
- 遍历 算法1 输出的每批待合并的 sensor 列表
- 对于待合并列表中的每一个 sensor
- 如果是乱序空间的文件合并,采取 反序列化Page合并算法
- 如果是顺序空间的文件合并
- 如果某个 Chunk 的数据点数小于 merge_page_point_number,采取 反序列化Page合并算法
- 否则,采取 追加Page合并算法
- 对于待合并列表中的每一个 sensor
- for tsfile in TsFileList
- 将 FileReaderManager 中此文件 decreaseReaderReference
- 序列化新文件的 TsFileResource
- 关闭新文件 writer
...
10.1 如果si中标记已被合并的chunk占总chunk的比例大于某值threshold:
10.1.0 在merge.log中记录“{si'} start {si'的当前长度}”
10.1.1 将si中未标记已被合并的chunk写入si';
10.1.2 为si'生成FileMetadata并写入到si'尾部;
10.1.3 等待对si的所有查询结束,并对si加锁;
10.1.4用si'替代si;
10.1.5 在merge.log中记录“{si} end”, 对si解锁;
10.2 否则:
10.2.1 等待对si的所有查询结束,并对si加锁;
10.2.2 将si的尾部的FileMetadata截去;
10.2.3 在merge.log中记录“{si} {si的当前长度}”
10.2.4 将si'中的数据写到si尾部;
10.2.5 为si生成FileMetadata并写入到si尾部,该FileMetadata中不包含被标记已经合并的chunk,但是记录有多少chunk被标为已经合并;
10.2.6 在merge.log中记录“{si} end”,删除si',对si解锁;
11. 在merge.log中记录“merge end”,删除U中的所有文件和merge.log
存在对齐时间序列的消除乱序文件合并流程
- 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
- 遍历device列表 (去掉了sensor组的概念)
遍历顺序文件列表,对于每一个顺序文件 - 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
- 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
- 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
- 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组对于每个子任务的IMeasurementSchema→chunkMetadataList列表 - 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
- 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面 - 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl