THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
并且因为要同时读多个sensor对应chunk的所有数据,内存占用会变高。
改进的流程
- 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
- 遍历device集合
- 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
- 根据device读取各个文件对应的TimeseriesMetadataListIterator(TimeseriesMetadataListIterator每次按字典序吐出1000个sensor及其对应的TimeseriesMetadata)
- 新建一个Map<TsFileSequenceReader, ITimeseriesMetadata> timeseriesMetadataCacheForMerge专门缓存reader及其对应的TimeseriesMetadata
- 循环遍历所有TimeseriesMetadataListIterator直到没有任何一个TimeseriesMetadataListIterator还有数据
- 新建一个Set<IMeasuementSchema> iMeasuementSchemaCache临时存储本次所有完整的IMeasuementSchema(如果存在vector,可能不完整,这类schema不放入此次合并,等待下一次取完再合并)
- 对于每次吐出的TimeseriesMetadata及其对应的sensor做归并排序,然后依次判断每一个TimeseriesMetadata是不是一个VectorTimeseriesMetadata(根据timeSeriesMetadataType判断)
- 如果接下来开始是一个VectorTimeseriesMetadata
- 不进行下一步合并,直到这个VectorTimeseriesMetadata被取完(还要考虑VectorTimeseriesMetadata的sensor字典序与普通TimeseriesMetadata字典序的关系)
- 将取完的VectorTimeseriesMetadata手动封装为完整的VectorTimeseriesMetadata并放入timeseriesMetadataCacheForMerge
- 将取完的VectorTimeseriesMetadata手动封装出VectorMeasurementSchema并放入iMeasuementSchemaCache
- 如果接下来是一个普通TimeseriesMetadata
- 将TimeseriesMetadata放入timeseriesMetadataCacheForMerge
- 将TimeseriesMetadata封装出MeasurementSchema并放入iMeasuementSchemaCache
- 如果接下来开始是一个VectorTimeseriesMetadata
- 对iMeasuementSchemaCache的每一个IMeasuementSchema,如果该measurementId字典序小于归并排序后的最后一个measurementId
- 如果是 MeasurementSchema(单sensor)
- 走原来流程
- 如果是 VectorMeasurementSchema (多sensor)
- 如果是乱序空间的文件合并,采取deserialize合并
- 根据 timeseriesMetadataCacheForMerge中各自文件的reader有序读出对应VectorMeasurementSchema的TimeChunk并做合并
- 新建VectorChunkWriter
- 根据文件顺序遍历reader
- 同时遍历TimeChunk中的时间戳和这个文件该VectorTimeseriesMetadata真实数据的时间戳
- 如果有,则将value写入VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件(这里VectorChunkWriter可能过大,需要配置调节大小,如何在vector环境下调节合并大小配置需要讨论)
- 如果是顺序空间的文件合并
- 如果page足够大,采取append合并
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk点而有序合并出对应的新Chunk和ChunkMetadata
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 如果page不足够大,采取deserialize合并
- 根据 timeseriesMetadataCacheForMerge中各自文件的reader有序读出对应VectorMeasurementSchema的TimeChunk并做合并
- 新建VectorChunkWriter
- 根据文件顺序遍历reader
- 同时遍历TimeChunk中的时间戳和这个文件该VectorTimeseriesMetadata真实数据的时间戳
- 如果有,则将value写入VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件(这里VectorChunkWriter可能过大,需要配置调节大小,如何在vector环境下调节合并大小配置需要讨论)
- 如果page足够大,采取append合并
- 如果是乱序空间的文件合并,采取deserialize合并
- 如果是 MeasurementSchema(单sensor)
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
跨文件空间合并
原来的流程
- 从MManager中的取出改storageGroup所有的device→sensor
- 遍历device列表
- 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
- 遍历顺序文件列表,对于每一个顺序文件
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个sensor建立sensor→chunkMetadataList的列表
- 对于每一个sensor→chunkMetadataList列表进行按合并子任务并行配置进行分组
- 对于每个子任务的sensor→chunkMetadataList列表
- 遍历每一个chunkMetadata对应的Chunk,如果有与unseq文件overlapped数据,则插入
- 如果遍历完该unseq文件还有剩余的数据,则直接append到结果文件后面
- 对于每个子任务的sensor→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件
- 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
...
并且因为要同时读多个sensor以及本次选出的顺序乱序文件对应chunk的所有数据,内存占用会变高。
改进的流程
- 从MManager中的取出改storageGroup所有的device
- 遍历device列表
- 遍历顺序文件列表,对于每一个顺序文件
- 新建一个Map<TsFileSequenceReader, ITimeseriesMetadata> timeseriesMetadataCacheForMerge专门缓存reader及其对应的TimeseriesMetadata
- 根据device读取各个文件对应的TimeseriesMetadataListIterator(TimeseriesMetadataListIterator每次按字典序吐出1000个sensor及其对应的TimeseriesMetadata)
- 循环遍历所有TimeseriesMetadataListIterator直到没有任何一个TimeseriesMetadataListIterator还有数据
- 新建一个Set<IMeasuementSchema> iMeasuementSchemaCache临时存储本次所有完整的IMeasuementSchema(如果存在vector,可能不完整,这类schema不放入此次合并,等待下一次取完再合并)
- 对于每次吐出的TimeseriesMetadata及其对应的sensor做归并排序,然后依次判断每一个TimeseriesMetadata是不是一个VectorTimeseriesMetadata(根据timeSeriesMetadataType判断)
- 如果接下来开始是一个VectorTimeseriesMetadata
- 不进行下一步合并,直到这个VectorTimeseriesMetadata被取完(还要考虑VectorTimeseriesMetadata的sensor字典序与普通TimeseriesMetadata字典序的关系)
- 将取完的VectorTimeseriesMetadata手动封装为完整的VectorTimeseriesMetadata并放入timeseriesMetadataCacheForMerge
- 将取完的VectorTimeseriesMetadata手动封装出VectorMeasurementSchema并放入iMeasuementSchemaCache
- 如果接下来是一个普通TimeseriesMetadata
- 将TimeseriesMetadata放入timeseriesMetadataCacheForMerge
- 将TimeseriesMetadata封装出MeasurementSchema并放入iMeasuementSchemaCache
- 如果接下来开始是一个VectorTimeseriesMetadata
- 对于每一个iMeasuementSchemaCache列表进行按合并子任务并行配置进行分组,
- 对于每个子任务的iMeasuementSchemaCache列表
- 从timeseriesMetadataCacheForMerge取出对应的reader和ITimeseriesMetadata
- 新建VectorChunkWriter
- 对于每一个TimeseriesMetadata,遍历对应的sensor
- 按文件顺序遍历每一个TimeChunk,如果有与unseq文件overlapped数据,则插入VectorChunkWriter
- 如果遍历完该unseq文件还有剩余的数据,则直接append到VectorChunkWriter后面
- 对于每个子任务的iMeasuementSchemaCache列表
- 遍历顺序文件列表,对于每一个顺序文件