Table of Contents |
---|
名词解释
顺序空间:顺序数据文件所在的空间
乱序空间:乱序数据文件所在的空间
层级合并(在顺序空间或乱序空间内部执行)
不存在对齐时间序列时的层级合并流程
- 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
- 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
- 遍历device集合
- 根据 ReaderCache 得到相应的 TsFileSequenceReader
- 根据 device 读取各个文件对应的 ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
- 遍历 算法1 输出的每批待合并的 sensor 列表
- 对于待合并列表中的每一个 sensor
- 如果是乱序空间的文件合并,采取 反序列化Page合并算法
- 如果是顺序空间的文件合并
- 如果某个 Chunk 的数据点数小于 merge_page_point_number,采取 反序列化Page合并算法
- 否则,采取 追加Page合并算法
- 对于待合并列表中的每一个 sensor
- 关闭 ReaderCache 中所有的 reader
- 序列化新文件的 TsFileResource
- 关闭新文件 writer
...
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
存在对齐时间序列时的层级合并流程
- 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
- 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
- 遍历device集合
- 根据 ReaderCache 得到相应的TsFileSequenceReader
- 根据 device 读取各个文件对应的ChunkMetadataListIterator
- 循环算法2输出的待合并的 IMeasurementSchema 列表
- 对于待合并的 IMeasurementSchema
- 如果是乱序的文件合并,采取 存在对齐时间序列的反序列化 Page 合并算法
- 如果是顺序空间的文件合并
- 如果某个 Chunk(如果是对齐时间序列,判断 TimeChunk) 的数据点数小于 merge_page_point_number,采取 存在对齐时间序列的反序列化 Page 合并算法
- 如果page足够大,采取 存在对齐时间序列的追加 Page 合并算法
- 关闭 ReaderCache 中所有的reader
- 序列化新文件的 TsFileResource
- 关闭新文件writer
...
合并后: timeChunk(page1, page2, page6) s1Chunk(page3, page4, page7), s2Chunk(page5, page8)
直接追加chunk合并
原流程存在的问题
对于层级合并来说(以顺序空间的层级合并为例),是通过seq_level_num和seq_file_num_in_each_level这两个参数来控制合并到最后的chunk大小的,即把原来的chunk扩大seq_file_num_in_each_levelseq_level_num-1倍,这种配置方案存在如下三个问题:
...
如果在待合并列表中这个sensor对应的所有chunk都已经达到了这个阈值,则不再合并chunk,直接将读出来的chunk写入新文件
跨文件空间合并(将乱序文件合并至顺序空间)
不存在对齐时间序列的消除乱序文件合并流程
输入一组乱序文件U,以及一组顺序文件S,最大同时合并的时间序列数量为n,单个chunk的点数阈值tpt,是否进行Full Merge
...
11. 在merge.log中记录“merge end”,删除U中的所有文件和merge.log
存在对齐时间序列的消除乱序文件合并流程
- 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
- 遍历device列表 (去掉了sensor组的概念)
- 遍历顺序文件列表,对于每一个顺序文件
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
- 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
- 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
- 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
- 对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组
- 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
- 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
- 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
- 如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面
- 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl
- 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
- 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件
...