文件空间内合并
本文档描述“在合并策略已经选定了待合并的文件列表后”的流程。
原来的流程
- 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
- 遍历device集合
- 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个sensor及其对应的ChunkMetadata列表
- 如果是乱序空间的文件合并,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件
- 如果是顺序空间的文件合并
- 如果page足够大,采取append合并
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 如果page不足够大,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件
- 如果page足够大,采取append合并
- 如果是乱序空间的文件合并,采取deserialize合并
- 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个sensor及其对应的ChunkMetadata列表
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
Vector引入的问题
1、现在的合并流程是按sensor合并的,如果这个sensor其实是某个vector的一部分,那么使用deserialize合并读不出vector中留空占位的数据,写入时就会破坏原来排列整齐的vector,而append合并没有影响。
2、ChunkMetadataListIterator是根据sensor吐出ChunkMetadataList的,这些sensor与MeasurementSchema是一一对应的。
但是现在有了VectorMeasurementSchema,故一个IMeasurementSchema可能对应多个sensor,而现在的MManager又不支持查询一个sensor是不是在某个vector内,所以没有办法把按sensor合并改成按 VectorMeasurementSchema合并。
修改方案
将流程改为
- 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
- 遍历device集合
- 从MManager中遍历该device对应的所有IMeasurementSchema
- 如果是 MeasurementSchema(单sensor)
- 走原来流程
- 如果是 VectorMeasurementSchema (多sensor)
- 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
- 对这些sensor的time取并集,记为allTime
- 根据sensor遍历 sensor-time-value 列表
- 遍历allTime
- 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件
- 遍历allTime
- 如果是乱序空间的文件合并,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件
- 如果是顺序空间的文件合并
- 如果page足够大,采取append合并
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk点而有序合并出对应的新Chunk和ChunkMetadata
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 如果page不足够大,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
- 对这些sensor的time取并集,记为allTime
- 根据sensor遍历 sensor-time-value 列表
- 遍历allTime
- 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件
- 遍历allTime
- 如果page足够大,采取append合并
- 如果是 MeasurementSchema(单sensor)
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
- 从MManager中遍历该device对应的所有IMeasurementSchema
问题
如此修改去掉了如下过程:
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
CPU占用率会提高。
并且因为要同时读多个sensor对应chunk的所有数据,内存占用会变高。
改进的流程
- 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
- 遍历device集合
- 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个IMeasurementSchema及其对应的ChunkMetadata列表
- 如果是乱序空间的文件合并,采取deserialize合并
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果是MeasurementSchema,使用ChunkReader
- 如果是VectorMeasurementSchema,使用VectorChunkReader
- 遍历上述的time-value列表,将数据写入新的IChunkWriter
- 如果是MeasurementSchema,使用ChunkWriterImpl
- 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
- 判断限流
- 将IChunkWriter写入新文件
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果是顺序空间的文件合并
- 如果page足够大,采取append合并
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 如果是MeasurementSchema,仅需要前一个chunk和后一个chunk进行合并
- 如果是VectorMeasurementSchema,需要前一个VectorChunkMetadata对应的所有timeChunk和valueChunk与后一个所有的timeChunk和valueChunk合并
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 如果page不足够大,采取deserialize合并
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果是MeasurementSchema,使用ChunkReader
- 如果是VectorMeasurementSchema,使用VectorChunkReader
- 遍历上述的time-value列表,将数据写入新的IChunkWriter
- 如果是MeasurementSchema,使用ChunkWriterImpl
- 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
- 判断限流
- 将IChunkWriter写入新文件
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果page足够大,采取append合并
- 如果是乱序空间的文件合并,采取deserialize合并
- 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个IMeasurementSchema及其对应的ChunkMetadata列表
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
跨文件空间合并
原来的流程
- 从MManager中的取出改storageGroup所有的device→sensor
- 遍历device列表
- 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
- 遍历顺序文件列表,对于每一个顺序文件
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个sensor建立sensor→chunkMetadataList的列表
- 对于每一个sensor→chunkMetadataList列表进行按合并子任务并行配置进行分组
- 对于每个子任务的sensor→chunkMetadataList列表
- 遍历每一个chunkMetadata对应的Chunk,如果有与unseq文件overlapped数据,则插入
- 如果遍历完该unseq文件还有剩余的数据,则直接append到结果文件后面
- 对于每个子任务的sensor→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件
- 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
Vector引入的问题
1、如果合并的某个sensor是vector的一部分,又在有些时间戳上是空值,乱序合并后会把空值全部去除,导致vector不再整齐。
修改方案
- 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
- 遍历device列表
- 按组遍历device对应的IMeasurementSchema列表(这里的组是最大同时合并的时间序列数量n)
- 遍历顺序文件列表,对于每一个顺序文件
- 建立IMeasurementSchema->sensor->chunkMetaList的列表
- 对于每一个IMeasurementSchema列表进行按合并子任务并行配置进行分组
- 对于每个子任务的IMeasurementSchema→sensor→chunkMetadataList列表
- 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
- 对这些sensor的time取并集,记为allTime
- 根据sensor遍历 sensor-time-value 列表
- 遍历allTime
- 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件
- 遍历allTime
- 对于每个子任务的IMeasurementSchema→sensor→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件
- 按组遍历device对应的IMeasurementSchema列表(这里的组是最大同时合并的时间序列数量n)
问题
如此修改去掉了如下过程:
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
CPU占用率会提高。
并且因为要同时读多个sensor以及本次选出的顺序乱序文件对应chunk的所有数据,内存占用会变高。
改进的流程
- 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
- 遍历device列表 (去掉了sensor组的概念)
- 遍历顺序文件列表,对于每一个顺序文件
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
- 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
- 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
- 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
- 对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组
- 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
- 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
- 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
- 如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面
- 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl
- 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
- 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件