Table of Contents |
---|
空间内合并执行流程(针对一元时间序列)
输入:待合并文件列表 TsFileList
- deviceSet=[]
- writer = 目标文件 RestorableTsFileIOWriter
- for tsfile in TsFileList
- 从 FileReaderManager 中获取此 tsfile 的 TsFileSequenceReader,并读出该文件的设备列表,添加至 deviceSet
- for device in deviceSet
- metadataIteratorList = []
- for tsfile in TsFileList
- 读取 tsfile 中 device 对应的 ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出 max_degree_of_index_node 个 sensor 及其对应的 ChunkMetadata 列表)
- 将 ChunkMetadataListIterator 添加到 metadataIteratorList
- 遍历 算法1 输出的每批待合并的 sensor 列表
- 对于待合并列表中的每一个 sensor
- 如果是乱序空间的文件合并,采取 反序列化Page合并算法
- 如果是顺序空间的文件合并
- 如果某个 Chunk 的数据点数小于 merge_page_point_number,采取 反序列化Page合并算法
- 否则,采取 追加Page合并算法
- 对于待合并列表中的每一个 sensor
- for tsfile in TsFileList
- 将 FileReaderManager 中此文件 decreaseReaderReference
- 序列化新文件的 TsFileResource
- 关闭新文件 writer
算法1:
输入:多个文件的 ChunkMetadataListIterator(简称iterator),Iterator 每次输出的 List 内的 sensor 个数为 max_index_degree(假设有3个文件:file1_Iterator 返回两批 sensor_list:(s1,s2),(s3) , file2_iterator:(s1, s3) file3_iterator:(s1,s2),(s3,s5))
输出:List<sensor>
描述:每个 iterator 取1个 List,找到每个 List 的最大字典序的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor_lex_min,直到所有都合并完 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst
优势:每次至少会消耗完一个 file 的一个 List
第一轮:file1(s1,s2)+file2(s1,s3)+file3(s1,s2) → S=(s2, s3),最小字典序 sensor_lex_min 是 s2,输出 (s1,s2)
第二轮:file1(s3)+file2(s3)+file3(s3,s5) → S=(s3, s5, s5),最小字典序 sensor_lex_min 是 s3, 输出(s3)
第三轮:file3(s5) → S=(s5),最小字典序 sensor_lex_min 是 s5,输出(s5)
反序列化Page合并算法
文件空间内合并
本文档描述“在合并策略已经选定了待合并的文件列表后”的流程。
原来的流程
- 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
- 遍历device集合
- 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个sensor及其对应的ChunkMetadata列表
- 如果是乱序空间的文件合并,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件
- 如果是顺序空间的文件合并
- 如果page足够大,采取append合并
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 如果page不足够大,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件
- 如果page足够大,采取append合并
- 如果是乱序空间的文件合并,采取deserialize合并
- 对ChunkMetadataListIterator吐出的数据做归并排序,对于合并出来的每一个sensor及其对应的ChunkMetadata列表
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
Vector引入的问题
1、现在的合并流程是按sensor合并的,如果这个sensor其实是某个vector的一部分,那么使用deserialize合并读不出vector中留空占位的数据,写入时就会破坏原来排列整齐的vector,而append合并没有影响。
2、ChunkMetadataListIterator是根据sensor吐出ChunkMetadataList的,这些sensor与MeasurementSchema是一一对应的。
但是现在有了VectorMeasurementSchema,故一个IMeasurementSchema可能对应多个sensor,而现在的MManager又不支持查询一个sensor是不是在某个vector内,所以没有办法把按sensor合并改成按 VectorMeasurementSchema合并。
修改方案
将流程改为
...
- 走原来流程
...
- 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
- 对这些sensor的time取并集,记为allTime
- 根据sensor遍历 sensor-time-value 列表
- 遍历allTime
- 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件
- 遍历allTime
...
- 通过各自文件的reader有序把数据读出并整理出对应的time-value列表
- 遍历上述的time-value列表,将数据写入新的ChunkWriter
- 判断限流
- 将ChunkWriter写入新文件如果是顺序空间的文件合并
如果page足够大,采取append合并
追加Page合并算法
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk点而有序合并出对应的新chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 如果page不足够大,采取deserialize合并
- 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
- 对这些sensor的time取并集,记为allTime
- 根据sensor遍历 sensor-time-value 列表
- 遍历allTime
- 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件
- 遍历allTime
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
问题
如此修改去掉了如下过程:
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
CPU占用率会提高。
并且因为要同时读多个sensor对应chunk的所有数据,内存占用会变高。
改进的流程
存在对齐时间序列时的层级合并流程
- 根据 seq_file_num_in_each_level / unseq_file_num_in_each_level 获取此次待合并层级中的 TsFileResource,作为待合并文件列表
- 对待合并文件列表中的每个文件,创建一个 TsFIleSequenceReader,并存在 ReaderCache 中,并获取该文件的设备列表,生成所有的 device 集合
- 遍历device集合
- 根据 ReaderCache 得到相应的TsFileSequenceReader
- 根据 device 读取各个文件对应的ChunkMetadataListIterator
- 循环算法2输出的待合并的 IMeasurementSchema 列表
- 对于待合并的 IMeasurementSchema
- 如果是乱序的文件合并,采取 存在对齐时间序列的反序列化 Page 合并算法
- 如果是顺序空间的文件合并
- 如果某个 Chunk(如果是对齐时间序列,判断 TimeChunk) 的数据点数小于 merge_page_point_number,采取 存在对齐时间序列的反序列化 Page 合并算法
- 如果page足够大,采取 存在对齐时间序列的追加 Page 合并算法
- 关闭 ReaderCache 中所有的reader
- 序列化新文件的 TsFileResource
- 关闭新文件writer
算法2
输入:多个文件的 ChunkMetadataListIterator,Iterator 每次输出的 List 内的 sensor 个数(包含对齐时间序列内的序列数,不包含 time)为 max_degree_of_index_node(由于对齐时间序列是个整体,因此可能会超过部分)
输出:每轮待合并的 sensor 列表
描述:每个迭代器取1个 List,找到每个 List 的最大字典序(对齐时间序列按 time 的名字 $#$id 来比较)的 sensor,组成集合 S,本次合并从头开始合并到 S 中最小字典序的 sensor 。并从 List 中清除已合并的 sensor,如果此 List 消耗完,获取下一个 Lst
优势:每次至少会消耗完一个 file 的一个 List
Iterator 每次输出的 List 的例子:文件内的序列为:(time, s4,s5,s6), (time, s9, s10), s1,s2,s3,s7,s8
如果 max_degree_of_index_node 为 2, 每次取出的为:(time, s4,s5,s6); (time, s9, s10); s1,s2; s3,s7; s8
如果 max_degree_of_index_node 为 3,每次取出的为:(time, s4,s5,s6); (time, s9, s10), s1; s2, s3,s7; s8
如果 max_degree_of_index_node 为 4,每次取出的为:(time, s4,s5,s6), (time, s9, s10); s1 s2, s3,s7; s8
存在对齐时间序列的反序列化 Page 合并算法
- 根据待合并文件列表,依次使用每个文件的TsFileSequenceReader,获取该文件的设备列表,生成所有的device集合
- 遍历device集合
- 根据文件从文件读取器cache中得到相应的TsFileSequenceReader
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为256个普通MeasurementSchema或超过256个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能) 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据对ChunkMetadataListIterator吐出的数据做归并排序
- 对于合并出来的每一个IMeasurementSchema及其对应的ChunkMetadata列表 如果是乱序空间的文件合并,采取deserialize合并通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果是MeasurementSchema,使用ChunkReader
- 如果是VectorMeasurementSchema,使用VectorChunkReader
- 遍历上述的time-value列表,将数据写入新的IChunkWriter
- 如果是MeasurementSchema,使用ChunkWriterImpl
- 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
- 判断限流
- 将IChunkWriter写入新文件 如果是顺序空间的文件合并如果page足够大,采取append合并
存在对齐时间序列的追加 Page 合并算法
- 通过各自文件的reader有序把chunk读出来,并通过合并ByteBuffer和统计信息的方式不解析chunk数据点、而有序合并出对应的新Chunk和ChunkMetadata
- 如果是MeasurementSchema,仅需要前一个chunk和后一个chunk进行合并
- 如果是VectorMeasurementSchema,需要前一个VectorChunkMetadata对应的所有timeChunk和valueChunk与后一个所有的timeChunk和valueChunk合并
- 判断限流
- 将合并完的Chunk和ChunkMetadata写入新文件
- 如果page不足够大,采取deserialize合并
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 如果是MeasurementSchema,使用ChunkReader
- 如果是VectorMeasurementSchema,使用VectorChunkReader
- 遍历上述的time-value列表,将数据写入新的IChunkWriter
- 如果是MeasurementSchema,使用ChunkWriterImpl
- 如果是VectorMeasurementSchema,使用VectorChunkWriterImpl
- 判断限流
- 将IChunkWriter写入新文件
- 通过各自文件的IChunkReader有序把数据读出并整理出对应的time-value列表
- 关闭reader cache中所有的reader
- 序列化新文件
- 关闭新文件writer
跨文件空间合并
原来的流程
- 从MManager中的取出改storageGroup所有的device→sensor
- 遍历device列表
- 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
- 遍历顺序文件列表,对于每一个顺序文件
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个sensor建立sensor→chunkMetadataList的列表
- 对于每一个sensor→chunkMetadataList列表进行按合并子任务并行配置进行分组
- 对于每个子任务的sensor→chunkMetadataList列表
- 遍历每一个chunkMetadata对应的Chunk,如果有与unseq文件overlapped数据,则插入
- 如果遍历完该unseq文件还有剩余的数据,则直接append到结果文件后面
- 对于每个子任务的sensor→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件
- 按组遍历device对应的sensor列表(这里的组是最大同时合并的时间序列数量n)
Vector引入的问题
1、如果合并的某个sensor是vector的一部分,又在有些时间戳上是空值,乱序合并后会把空值全部去除,导致vector不再整齐。
修改方案
- 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
- 遍历device列表
- 按组遍历device对应的IMeasurementSchema列表(这里的组是最大同时合并的时间序列数量n)
- 遍历顺序文件列表,对于每一个顺序文件
- 建立IMeasurementSchema->sensor->chunkMetaList的列表
- 对于每一个IMeasurementSchema列表进行按合并子任务并行配置进行分组
- 对于每个子任务的IMeasurementSchema→sensor→chunkMetadataList列表
- 通过各自文件的reader有序把数据读出并整理出对应的sensor-time-value列表
- 对这些sensor的time取并集,记为allTime
- 根据sensor遍历 sensor-time-value 列表
- 遍历allTime
- 遍历上述的sensor-time-value列表,将数据写入新的VectorChunkWriter,如果没有这个时间戳,则传isNull
- 判断限流
- 将VectorChunkWriter写入新文件
- 遍历allTime
- 对于每个子任务的IMeasurementSchema→sensor→chunkMetadataList列表
- 遍历顺序文件列表,对于每一个顺序文件
- 按组遍历device对应的IMeasurementSchema列表(这里的组是最大同时合并的时间序列数量n)
问题
如此修改去掉了如下过程:
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出1000个sensor及其对应的ChunkMetadata列表)
CPU占用率会提高。
并且因为要同时读多个sensor以及本次选出的顺序乱序文件对应chunk的所有数据,内存占用会变高。
改进的流程
例子:
VectorChunk1: timeChunk(page1,page2) s1Chunk(page3, page4) s2Chunk(page5)
VectorChunk2: timeChunk(page6) s1Chunk(page7) s2Chunk(page8)
合并后: timeChunk(page1, page2, page6) s1Chunk(page3, page4, page7), s2Chunk(page5, page8)
直接追加chunk合并
原流程存在的问题
对于层级合并来说(以顺序空间的层级合并为例),是通过seq_level_num和seq_file_num_in_each_level这两个参数来控制合并到最后的chunk大小的,即把原来的chunk扩大seq_file_num_in_each_levelseq_level_num-1倍,这种配置方案存在如下三个问题:
1、在配置时需要知道用户原先写出的一个chunk的大小,并根据实际经验来配置。这样的配置方式对用户来说难以自己上手配置,对于每一个用户来说,也都需要我们去观察文件并辅助配置,非常不方便
2、一旦用户使用了默认参数没有修改或者配置错误,而用户本身场景又不需要合并太多次文件,那么就会占用很多无效的磁盘IO,甚至降低查询效率
3、在一些场景中,不同时间序列的写入速度是不一样的,那么按照这种合并方式,最后会合并出很多大小不一的chunk,写入速度慢的时间序列对应的chunk小,写入速度快的时间序列对应的chunk大,造成chunk大小不均匀,难以控制
例子:
假设用户需要的目标chunk是4倍大小的原始chunk,有如下文件
0层:file1(s1,s2,s2)+file2(s2,s2,s2) 即file1有1个s1的chunk,2个s2的chunk,file2有3个s2的chunk
合并至1层:file3(s1(1),s2(5)) 合并出来的文件有1个s1的chunk,1个5倍大小的s2的chunk,假设他与另一个file4(s1(1),s2(5))进行合并
合并至2层:file5(s1(2),s2(9)) 合并出来的文件有1个2倍大小的s1的chunk,1个9倍大小的s2的chunk
可以看到s2的chunk越合并越大,已经远远超过用户需要的chunk大小,s1对应的chunk却仍没有达到用户的要求
解决方案
使用配置merge_chunk_point_number_threshold在合并每一个chunk的时候控制,
如果在待合并列表中这个sensor对应的所有chunk都已经达到了这个阈值,则不再合并chunk,直接将读出来的chunk写入新文件
跨文件空间合并(将乱序文件合并至顺序空间)(针对一元时间序列)
FullMerge(完全合并)
出于IO和合并性能考虑,乱序合并不会重写整个顺序文件,获取一个整理完全的顺序文件,而是会将与乱序文件重叠的 chunk 合并重写后,追加到原顺序文件后面,不删除原chunk,称这个过程为 原地合并
但是过多的 原地合并 会造成无用的数据越来越多,甚至在某些情况下导致一个文件大小无限制地增大(见线上问题分析)
而 完全合并 则是在乱序合并时重写整个顺序文件,IO和合并性能较低,但能获取一个整理完全的顺序文件
重新设计整个合并流程,舍弃 原地合并 只保留 完全合并
外部并行
执行拆分流程:device→seqFile→sensor
输入:待合并文件列表 SeqTsFileList, UnseqTsFileList
- 获取 SeqTsFileList和UnseqTsFileList 中的所有设备和传感器 deviceSensorsMap(Map<Device, List<Sensor>>)
- 新建一个 Map<TsFileResource, RestorableFileWriter> newWriterCache;
- 新建一个 Map<TsFileResource, TsFileResource> newTsFileResourceCache;
- for device, sensors in deviceSensorsMap
- 为 sensors 构建一个 bitMap,记录是否被合并,默认全部为 false
- for seqFile in SeqTsFileList
- 从 newTsFileResourceCache 中获取,若不存在则新建一个 TsFileResource writer
- 从 newWriterCache 中获取,若不存在则新建一个 RestorableFileWriter writer
- writer.startChunkGroup(device)
- 对于 seqFile 新建 ChunkMetadataListIterator
- 从 ChunkMetadataListIterator 中迭代获取该设备的 sensorList 及其对应的 List<ChunkMetadata> sensorChunkMetadataList(每次返回 max_degree_of_index_node 个 sensor),并获取 ModificationList,对于每一个迭代
- for sensor, sensorChunkMetadataList:
- 如果 sensorChunkMetadataList 不为空
- 根据 算法1 将该 sensor 的 Chunk 与对应的 unseqReader 中的数据进行合并
- 将该 sensor 在 bitMap 上的位置为 true
- 如果 sensorChunkMetadataList 不为空
- for sensor, sensorChunkMetadataList:
- 如果当前的 seqFile 是最后一个 seqFile:
- 将该 device 下在 bitMap 中记录未合并的 sensor 对应的 unseqReader 的数据写入这个 seqFile 的临时文件中
- for unseqReader in unseqReaderList
- if unseqReader 未读完
- 新建一个 ChunkWriter
- 根据 算法3 将未读完的 unseqReader 的剩余数据写入 ChunkWriter 中
- 将 ChunkWriter 写入到writer
- if unseqReader 未读完
- writer.endChunkGroup()
- for tsFileResource in newTsFileResourceCache
- tsFileResource.serialize()
- tsFileResource.close()
- for writer in newWriterCache
- writer.endFile()
算法1
输入:sensor, sensorChunkMetadataList, unseqReader, tsFileResource, modification
- for chunkMetadata in sensorChunkMetadataList
- chunk = readMemChunk(chunkMetadata)
- unclosedChunkPoint = 0L
- tsFileResource.updateStartTime(chunkMetadata.startTime)
- tsFileResource.updateEndTime(chunkMetadata.endTime)
- 判断当前 chunk 是否被修改 modified
- if isOverlap(chunkMetadata)
- 将其与乱序数据重叠的部分合并后写入 ChunkWriter(见 算法2)
- unclosedChunkPoint+=写入的点数
- else if isChunkTooSmall(chunkMetadata)
- 将这个 Chunk 解压缩后写入 ChunkWriter
- unclosedChunkPoint+=写入的点数
- else
- if unclosedChunkPoint > 0 || modified
- 将这个 Chunk 解压缩后写入 ChunkWriter
- else
- 将这个 Chunk 不解压缩写入 writer
- if unclosedChunkPoint > 0 || modified
- if unclosedChunkPoint > merge_chunk_point_num_threshold
- 将 ChunkWriter 写入 writer
- unclosedChunkPoint = 0
算法2
输入:chunk, unseqReader, deviceEndTime, tsFileResource, modification
- 对 Chunk 构建一个 ChunkReader
- 使用 ChunkReader 获取 Chunk 中的每一个 Page
- while pageData.hasNext()
- 获取 pageData 当前的时间戳 seqTime
- overwriteSeqPoint = false
- while 该 sensor 还有乱序数据并且乱序数据的下一个时间戳小于 seqTime
- 将乱序数据写入 ChunkWrtier 中
- tsFileResource.updateStartTime(乱序数据)
- tsFileResource.updateEndTime(乱序数据)
- 如果乱序数据的时间戳 == seqTime
- overwriteSeqPoint = true
- 根据 算法3 将该 unseqReader 小于等于 deviceEndTime 的剩余数据写入 ChunkWriter 中
- if !overwriteSeqPoint && !pageData 当前时间戳在 modification 中被删除
- 将 pageData 当前的数据点写入 ChunkWriter 中
- tsFileResource.updateStartTime(乱序数据)
- tsFileResource.updateEndTime(乱序数据)
- while pageData.hasNext()
算法3
输入:chunkWriter, unseqReader, timeLimit, tsFileResource
- while unseqReader 还有数据 && 数据时间戳<=timeLimit
- 将 unseqReader 当前数据写入 chunkWriter
- tsFileResource.updateStartTime(乱序数据)
- tsFileResource.updateEndTime(乱序数据)
- 从MManager中的取出改storageGroup所有的device→IMeasurementSchema
- 遍历device列表 (去掉了sensor组的概念)
- 遍历顺序文件列表,对于每一个顺序文件
- 根据device读取各个文件对应的ChunkMetadataListIterator(ChunkMetadataListIterator每次按字典序吐出对应的IMeasurementSchema及其对应的ChunkMetadata列表,每批个数为1000个普通MeasurementSchema或超过1000个sensor的最小的完整的VectorMeasurementSchema列表)(这里需要改变底层结构提高性能)
- 循环遍历所有ChunkMetadataListIterator直到没有任何一个ChunkMetadataListIterator还有数据
- 对于每一个IMeasurementSchema建立IMeasurementSchema→chunkMetadataList的列表
- 读取每一个chunkMetadataList,建立List<List<Chunk>> chunks的结构
- 如果是ChunkMetadata,将当前chunk读出包裹一个List放入chunks
- 如果是VectorChunkMetadata,将当前chunk按timeChunk, valueChunk1,...valueChunkN 的顺序包裹一个List放入chunks
对于每一个IMeasurementSchema→chunkMetadataList列表进行按合并子任务并行配置进行分组对于每个子任务的IMeasurementSchema→chunkMetadataList列表 - 按文件顺序遍历chunks列表的chunkList,对于chunkList的第一个chunk, 如果有与unseq文件overlapped数据,则插入IChunkWriter
- 如果IMeasurementSchema是MeasurementSchema,此时chunk列表的第一个chunk也是完整的chunk,直接写第一个chunk的数据写入ChunkWriterImpl
- 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有本行数据写入 VectorChunkWriterImpl
如果遍历完该unseq文件还有剩余的数据,则直接append到IChunkWriter后面 - 如果IMeasurementSchema是MeasurementSchema,直接append第一个chunk的剩余数据 如果IMeasurementSchema是VectorMeasurementSchema,此时chunk列表的第一个chunk是timeChunk,需要将timeChunk, valueChunk1,...valueChunkN 所有剩余数据按行写入 VectorChunkWriterImpl