0.目标

用户设置存储组往往根据业务语义设置,有时只有一个存储组,这意味着写入和查询的并发程度为1,为了增加写入和查询的并行度,对用户的设备进行虚拟存储组划分


1.用户视角

1.1 案例描述

    用户写入root.sg1.d1.s1, root.sg1.d2.s1, root.sg1.d3.s1, root.sg2.d1.s1, root.sg2.d2.s1五条序列,假设在给定的hash函数下root.sg1.d1映射为0, root.sg1.d2映射为1, root.sg1.d3映射为0,root.sg2.d1映射为0,root.sg2.d2映射为1,给定逻辑存储组数量为2,虚拟存储组数量也为2,假设写入数据均为顺序写入。

1.2 方案一目录结构(简洁起见,不考虑时间分区)

1.3 方案二目录结构(简洁起见,不考虑时间分区)

注意:由于刷盘并行度由物理存储组层控制,对于sg1来说,其刷盘并行度为2,这导致了两个虚拟存储组的数据刷在两个不同的文件中;对于sg2来说,其刷盘并行度为1,这导致两个虚拟存储组的数据刷在同一个文件中。


升级方案:由两个方案的目录结构可知,升级时将方案一“0, 1”文件夹下的内容合并放入上层文件夹即可


2.开发者视角

2.1 整体架构

2.2 方案一架构 

整体架构中N = 逻辑存储组数量,M = 虚拟存储组数量,L = 1,在用户例子下,其具体结构如下:(节点中的序号是为了之后的说明使用,无实际意义)

2.3 方案一写入流程

2.3.1 写入root.sg1.d1.s1

(1) → (2) → (4) → (8)

2.3.2 写入root.sg1.d2.s1

(1) → (2) → (5) → (9)

2.3.3 写入root.sg1.d3.s1

(1) → (2) → (4) → (8)

2.3.4 写入root.sg2.d1.s1

(1) → (3) → (6) → (10)

2.3.5 写入root.sg2.d2.s1

(1) → (3) → (7) → (11)


2.4 方案二架构

整体架构中N = 1, M = 虚拟存储组数量,L = 逻辑存储组数量,在用户例子下,其具体结构如下:(节点中的序号是为了之后的说明使用,无实际意义)

2.5 方案二写入流程

2.5.1 写入root.sg1.d1.s1

(1) → (2) → (3) → (5)

2.5.2 写入root.sg1.d2.s1

(1) → (2) → (4) → (5)

2.5.3 写入root.sg1.d3.s1

(1) → (2) → (3) → (5)

2.5.4 写入root.sg2.d1.s1

(1) → (2) → (3) → (6)

2.5.5 写入root.sg2.d2.s1

(1) → (2) → (4) → (6)


3.方案一详细设计

3.1 Storage Engine

ConcurrentHashMap<PartialPath, StorageGroupProcessor> processorMap -> ConcurrentHashMap<PartialPath, StorageGroupManager> processorMap


从时间序列设备ID找到存储组的过程需要做相应修改,主要影响的函数为:
insert(InsertRowPlan)
insertTablet(InsertTabletPlan)
delete(PartialPath, long, long, long)
deleteTimeseries(PartialPath, long)
query(SingleSeriesExpression, QueryContext, QueryFileManager)
mergeLock(List<PartialPath>)
loadNewTsFileForSync(TsFileResource newTsFileResource)
loadNewTsFile(TsFileResource newTsFileResource)
deleteTsfileForSync(File deletedTsfile)
moveTsfile(File tsfileToBeMoved, File targetDir)
setPartitionVersionToMax(PartialPath storageGroup, long partitionId,long newMaxVersion)
removePartitions(PartialPath storageGroupPath, TimePartitionFilter filter)
setTTL(PartialPath storageGroup, long dataTTL)


3.2 Storage Group Manager

字段:
VirtualPartitioner partitioner; // 用于决定划分虚拟存储组的具体逻辑
StorageGroupProcessor[] virtualStorageGroupProcessor; // 虚拟存储组列表
方法:
/**
* get processor from device id
* @param partialPath device path
* @return virtual storage group processor
*/
public StorageGroupProcessor getProcessor(PartialPath partialPath, StorageGroupMNode storageGroupMNode)
throws StorageGroupProcessorException, StorageEngineException;

/**
* recover
* @param storageGroupMNode logical sg mnode
*/
public void recover(StorageGroupMNode storageGroupMNode) throws StorageGroupProcessorException;

/**
* get all virtual storage group Processor
* @return all virtual storage group Processor
*/
public StorageGroupProcessor[] getAllVirutalStorageGroupProcessor();


3.3 VirtualPartitioner

public interface VirtualPartitioner {

/**
* use device id to determine storage group id
*
* @param deviceId device id
* @return virtual storage group id
*/
public int deviceToStorageGroup(PartialPath deviceId);

/**
* release resource
*/
public void clear();

/**
* get total number of virtual storage group
*
* @return total number of virtual storage group
*/
public int getPartitionCount();

}


3.4 执行逻辑

3.4.1 设备ID层操作(例如insert, query等)

(1)Storage Engine拿到device ID,根据其逻辑存储组名找到对应的StorageGroupManager
(2)使用deviceID根据StorageGroupManager.getProcessor方法拿到虚拟存储组(方案二中就是StorageGroupProcessor)
(3)将请求发给找到的虚拟存储组

3.4.2 逻辑存储组层操作(例如removePartitions等)

(1)Storage Engine拿到device ID,根据其逻辑存储组名找到对应的StorageGroupManager
(2)使用StorageGroupManager.getAllVirutalStorageGroupProcessor方法拿到所属全部虚拟存储组(方案二中就是StorageGroupProcessor)
(3)将请求转发至所有虚拟存储组


3.5主要开发任务
(1)建立VirtualPartitioner接口和相关实现类
(2)修改存储组分配逻辑
(3)修改merge相关代码
(4)修改恢复相关代码
(5)功能相关配置和开关
(6)协同sync和分布式进行代码修改
(7)全面的测试检查和修复,包括分布式相关测试
(8)正确性测试
(9)性能测试






  • No labels