Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
  • 遍历device集合
    • 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
    • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)Iterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
    • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
      • 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个IMeasurementSchema及其对应的ChunkMetadata列表
        • 如果是乱序空间的文件合并,采取deserialize合并
          • 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
            • 如果是MeasurementSchema,使用ChunkReader
            • 如果是VectorMeasurementSchema,使用VectorChunkReader
          • 遍历上述的time-value列表,将数据写入新的IChunkWriter
            • 如果是MeasurementSchema,使用ChunkWriterImpl
            • 如果是VectorMeasurementSchema如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
          • 判断限流
          • 将IChunkWriter写入新文件
        •  如果是顺序空间的文件合并
          • 如果page足够大,采取append合并
            •  通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新ChunkChunkMetadata
              • 如果是MeasurementSchema,仅需要前一个chunk和后一个chunk进行合并
              • 如果是VectorMeasurementSchema,需要前一个VectorChunkMetadata对应的所有timeChunk和valueChunk与后一个所有的timeChunk和valueChunk合并
            • 判断限流
            • 将合并完的ChunkChunkMetadata写入新文件
          • 如果page不足够大,采取deserialize合并
            • 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
              • 如果是MeasurementSchema,使用ChunkReader
              • 如果是VectorMeasurementSchema,使用VectorChunkReader
            • 遍历上述的time-value列表,将数据写入新的IChunkWriter
              • 如果是MeasurementSchema,使用ChunkWriterImpl
              • 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
            • 判断限流
            • 将IChunkWriter写入新文件
    • 关闭reader cache中所有的reader
    • 序列化新文件
    • 关闭新文件writer
  • 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
  • 遍历device集合
    • 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
    • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
    • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
    • 新建一个Set<IMeasuementSchema> iMeasuementSchemaCache临时存储本次所有完整的IMeasuementSchema(如果存在vector,可能不完整,这类schema不放入此次合并,等待下一次取完再合并)
    •  对于每次吐出的TimeseriesMetadata及其对应的sensor做归并排序,然后依次判断每一个TimeseriesMetadata是不是一个VectorTimeseriesMetadata(根据timeSeriesMetadataType判断)
      • 如果接下来开始是一个VectorTimeseriesMetadata
        • 不进行下一步合并,直到这个VectorTimeseriesMetadata被取完(还要考虑VectorTimeseriesMetadata的sensor字典序与普通TimeseriesMetadata字典序的关系)
        • 将取完的VectorTimeseriesMetadata手动封装为完整的VectorTimeseriesMetadata并放入timeseriesMetadataCacheForMerge
        • 将取完的VectorTimeseriesMetadata手动封装出VectorMeasurementSchema并放入iMeasuementSchemaCache
      • 如果接下来是一个普通TimeseriesMetadata
        • 将TimeseriesMetadata放入timeseriesMetadataCacheForMerge
        • 将TimeseriesMetadata封装出MeasurementSchema并放入iMeasuementSchemaCache
    • iMeasuementSchemaCache的每一个IMeasuementSchema,如果该measurementId字典序小于归并排序后的最后一个measurementId
    •  如果是 MeasurementSchema(单sensor)
      • 走原来流程
    • 如果是 VectorMeasurementSchema (多sensor)
    • 如果是乱序空间的文件合并,采取deserialize合并
      • 根据 timeseriesMetadataCacheForMerge中各自文件的reader有序读出对应VectorMeasurementSchema的TimeChunk并做合并
      • 新建VectorChunkWriter
      • 根据文件顺序遍历reader
        • 同时遍历TimeChunk中的时间戳和这个文件该VectorTimeseriesMetadata真实数据的时间戳
        • 如果有,则将value写入VectorChunkWriter,如果没有这个时间戳,则传isNull
      • 判断限流
      • 将VectorChunkWriter写入新文件(这里VectorChunkWriter可能过大,需要配置调节大小,如何在vector环境下调节合并大小配置需要讨论)
    • 如果是顺序空间的文件合并
      • 如果page足够大,采取append合并
        •  通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk点而有序合并出对应的新ChunkChunkMetadata
        • 判断限流
        • 将合并完的ChunkChunkMetadata写入新文件
      • 如果page不足够大,采取deserialize合并
        • 根据 timeseriesMetadataCacheForMerge中各自文件的reader有序读出对应VectorMeasurementSchema的TimeChunk并做合并
        • 新建VectorChunkWriter
        • 根据文件顺序遍历reader
          • 同时遍历TimeChunk中的时间戳和这个文件该VectorTimeseriesMetadata真实数据的时间戳
          • 如果有,则将value写入VectorChunkWriter,如果没有这个时间戳,则传isNull
        • 判断限流
        • 将VectorChunkWriter写入新文件(这里VectorChunkWriter可能过大,需要配置调节大小,如何在vector环境下调节合并大小配置需要讨论)
    • 关闭reader cache中所有的reader
    • 序列化新文件
    • 关闭新文件writer

跨文件空间合并

原来的流程

  • 从MManager中的取出改storageGroup所有的device→sensor
  • 遍历device列表 
    • 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
      • 遍历顺序文件列表,对于每一个顺序文件
        • 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
        • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据 
          • 对于每一个sensor建立sensor→chunkMetadataList的列表
        • 对于每一个sensor→chunkMetadataList列表进行按合并子任务并行配置进行分组
          • 对于每个子任务的sensor→chunkMetadataList列表
            • 遍历每一个chunkMetadata对应的Chunk,如果有与unseq文件overlapped数据,则插入
            • 如果遍历完该unseq文件还有剩余的数据,则直接append到结果文件后面

...

并且因为要同时读多个sensor以及本次选出的顺序乱序文件对应chunk的所有数据,内存占用会变高。

改进的流程

  • 从MManager中的取出改storageGroup所有的device从MManager中的取出改storageGroup所有的device→IMeasurementSchema
  • 遍历device列表 (去掉了sensor组的概念)遍历device列表(去掉了sensor组的概念) 
    • 遍历顺序文件列表,对于每一个顺序文件
      • 新建一个Map<TsFileSequenceReader, ITimeseriesMetadata> timeseriesMetadataCacheForMerge专门缓存reader及其对应的TimeseriesMetadata 
      • 根据device读取各个文件对应的TimeseriesMetadataListIterator(TimeseriesMetadataListIterator每次按字典序吐出1000个sensor及其对应的TimeseriesMetadata)读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
      • 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据 
        • 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
        • 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
          • 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
          • 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
      • 对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组
        • 对于每个子任务的IMeasurementSchema→chunkMetadataList列表
          • 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
            • 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
            • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
          • 如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面
            • 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据
            • 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl
        TimeseriesMetadataListIterator直到没有任何一个TimeseriesMetadataListIterator还有数据
        • 新建一个Set<IMeasuementSchema> iMeasuementSchemaCache临时存储本次所有完整的IMeasuementSchema(如果存在vector,可能不完整,这类schema不放入此次合并,等待下一次取完再合并)
        •  对于每次吐出的TimeseriesMetadata及其对应的sensor做归并排序,然后依次判断每一个TimeseriesMetadata是不是一个VectorTimeseriesMetadata(根据timeSeriesMetadataType判断)
          • 如果接下来开始是一个VectorTimeseriesMetadata
            • 不进行下一步合并,直到这个VectorTimeseriesMetadata被取完(还要考虑VectorTimeseriesMetadata的sensor字典序与普通TimeseriesMetadata字典序的关系)
            • 将取完的VectorTimeseriesMetadata手动封装为完整的VectorTimeseriesMetadata并放入timeseriesMetadataCacheForMerge
            • 将取完的VectorTimeseriesMetadata手动封装出VectorMeasurementSchema并放入iMeasuementSchemaCache
          • 如果接下来是一个普通TimeseriesMetadata
            • 将TimeseriesMetadata放入timeseriesMetadataCacheForMerge
            • 将TimeseriesMetadata封装出MeasurementSchema并放入iMeasuementSchemaCache
        • 对于每一个iMeasuementSchemaCache列表进行按合并子任务并行配置进行分组(这里是否还要保留合并子任务的概念)
          • 对于每个子任务的iMeasuementSchemaCache列表
            • timeseriesMetadataCacheForMerge取出对应的reader和ITimeseriesMetadata
            • 新建VectorChunkWriter
            • 对于每一个TimeseriesMetadata遍历对应的sensor
            • 按文件顺序遍历每一个TimeChunk,如果有与unseq文件overlapped数据,则插入VectorChunkWriter
            • 如果遍历完该unseq文件还有剩余的数据,则直接append到VectorChunkWriter后面
            • 判断cha