Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyIOTDB-944

UDTF管理

装载

UDTF通过反射技术装载

CREATE

UDF 框架设计

(Author: Steve Yurong Su)

(Origin link: https://shimo.im/docs/vqhkTcDH6QXvQkyw)

UDF调研

时序数据库UDF

InfluxDB (自定Script,与kapacitor交互): https://docs.influxdata.com/kapacitor/v1.5/nodes/u_d_f_node/

TimescaleDB (类似Postgre): https://docs.timescale.com/latest/using-timescaledb/reading-data#advanced-analytics

FaunaDB (GraphQL): https://docs.fauna.com/fauna/current/api/graphql/functions

...

其他框架UDF

MySQL:

https://www.cnblogs.com/raker/p/4377343.html

https://blog.csdn.net/shaoyiwenet/article/details/53256103

Hive (HQL,定义方式,加载方式jar包动态加载):

https://blog.csdn.net/weixin_42181917/article/details/82865140

https://www.cnblogs.com/hd-zg/p/5947468.html

https://blog.csdn.net/ZZQHELLO2018/article/details/105700649

Flink:

https://blog.csdn.net/wangpei1949/article/details/103444412

几种查询UDF的使用情境

  1. 需要获取全量的查询数据才能进一步处理的情境(需要向用户提供随机、顺序访问序列的能力)
  2. 可以获取一定量数据后按批处理的情境(需要向用户提供随机、顺序访问序列的能力)
  3. 可以逐数据点(行)处理的情境 (需要向用户提供顺序访问序列的能力)

一个UDF查询的情景可能包括单序列查询、多序列查询。

单序列:

select udf(s1) from root.sg1.d1;
select udf(s1), s2 from root.sg1.d1;
select udf(s1), udf(s2) from root.sg1.d1; 

多序列:

select udf(s1, s2, s3) from root.sg1.d1; 
select udf(s1, s2, "string", "number"), udf1(s1), s3 from root.sg1.d1; 

在多序列查询的场景下,提供用户如下迭代方式进行transfrom:

分别按点迭代每条序列

直接迭代多条序列按时间对齐后的行

以上场景都需要考虑内存受限的情况。

UDF管理SQL语法

装载

CREATE [temporary] FUNCTION function_name AS class_name;

卸载

DROP [temporary] FUNCTION function_name;

展示

SHOW [temporary] FUNCTIONS;

编程接口设计

区分

用户定义聚合函数UDAF(源数据-生成数据 多对一

用户定义表格函数UDTF(源数据-生成数据 多对多

UDF传入参数的获取接口

/**
 * select语句中,udf接收的参数
 * 接收带单引号或双引号 STRING 类型的参数或者suffix path类型的参数
 * 参数可变长,允许有多个measurements
 * 
 * 例:
 * select 
 *   udf1(measurement0, measurement1, parameter0, parameter1), 
 *   udf2(measurement2, parameter2),
 *   measurement3, 
 *   measurement4
 * from root.sg1. ..;
 * 
 * 对udf1,measurement0, measurement1, parameter0, parameter1就是
 * parameters
 */
public class UDFParameters {


  int length();


  boolean getBoolean(int index);
  int getInteger(int index);
  long getLong(int index);
  float getFloat(int index);
  double getDouble(int index);
  String getString(int index);
  Path getPath(int index);

UDF用户个性化配置接口

最后可能区分UDAF UDTF的配置接口,以提供不同的配置选项。

/**
 * 给用户个性化配置UDF的接口
 * UDFConfigurations = new UDFConfigurations();
 * configurations
 *   .setMeasurementIndexes(Arrays.asList(0, 1, 2))
 *
 *   .setDataPointAccessStrategies(
 *      2, FETCH_BY_TIME_WINDOW))
 *   .setBatchTimeWindow(2, 100_000)
 *
 *   .mergeOriginalSeries("unique-name", Arrays.asList(0, 1))
 *   .setRowRecordAccessStrategies(
 *      "unique-name", FETCH_BY_SIZE_LIMITED_WINDOW)
 *   .setBatchSizeLimit("unique-name", 1000)
 *
 *   .setOutputDataType(TSDataType.FLOAT)
 *   .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
 *   .setTimeout(10000);
 *
 * 用户自定义配置完成后会有验证、检查过程
 */
public class UDFConfigurations {


  enum DataPointAccessStrategy {
    FETCH_BY_POINT, // 提供DataPointIterator
    FETCH_BY_TIME_WINDOW, // 提供DataPointBatchIterator
    FETCH_BY_SIZE_LIMITED_WINDOW, // 提供DataPointBatchIterator
    RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallDataPointIterator
  }
  
  enum RowRecordAccessStrategy {
    FETCH_BY_ROW, // 提供RowRecordIterator
    FETCH_BY_TIME_WINDOW, // 提供RowRecordBatchIterator
    FETCH_BY_SIZE_LIMITED_WINDOW, // 提供RowRecordBatchIterator
    RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallRowRecordIterator
  }
  
  // 说明 measurement 在参数中的位置,以0为起点
  public UDFConfigurations setMeasurementIndexes(int[] indexes);
  
  // 说明各个序列的读取策略,以此参数提供不同的iterators供用户使用
  public UDFConfigurations setDataPointAccessStrategies(
    int measurementIndex, 
    DataPointAccessStrategy pointAccessStrategies);  
  public UDFConfigurations setBatchSizeLimit(
    int measurementIndex, int sizeLimit);
  public UDFConfigurations setBatchTimeWindow(
    int measurementIndex, long timeWindow);
  public UDFConfigurations setBatchTimeWindow(
    int measurementIndex, long timeWindow, long startTime); 
    // startTime 提供 MIN_TIME_IN_SERIES
  
  public UDFConfigurations mergeOriginalSeries(
    String mergedSeriesName, int[] indexes);
  public UDFConfigurations setBatchSizeLimit(
    String mergedSeriesName, int sizeLimit);
  public UDFConfigurations setBatchTimeWindow(
    String mergedSeriesName, long timeWindow);
  public UDFConfigurations setBatchTimeWindow(
    String mergedSeriesName, long timeWindow, long startTime); 
    // startTime 提供 MIN_TIME_IN_SERIES
 
  // transform 后结果的类型
  public UDFConfigurations setOutputDataType(TSDataType dataType);
  
  // 展现在客户端的头信息
  public UDFConfigurations setColumnHeader(String header);
  
  // 执行时间限制
  public UDFConfigurations setTimeout(long timeout);


  // 更多setter以及getter ...
}

UDF接口

List可以变更为[]。

public abstract class UDF {


  protected List<Path> paths;
  protected List<TSDataType> originalSeriesDataTypes;
  
  // iterators,用户从此拿数据
  // OverallDataPointIterator / 需要获取全量的查询数据才能进一步处理的情境
  // DataPointBatchIterator / 可以获取一定量数据后按批处理的情境
  // DataPointIterator / 可以逐数据点处理的情
  // 针对序列可以分开处理的情形
  protected List<DataPointIterator> DataPointIterators; 
  // 多序列归并后,按时间对齐的结果,需要在UDFConfigurations.mergeOriginalSeries
  protected HashMap<String, RowRocordIterator> RowRecordIterators; 
  
  // 用于 show 管理语句信息的加载
  public abstract String name();
  public abstract String description();
  
  // initialize 调用后,实例内所有的属性才可用
  // 用于初始化,加载资源等
  public abstract void initialize(
    UDFParameters parameters,
    UDFConfigurations configurations
  );
  // 释放资源等
  public abstract void terminate();
  
  // 保留给系统进行设置
  public void setPaths(List<Path> paths);
  public void setOriginalSeriesDataTypes(List<TSDataType> dataTypes);
  public void setQueryDataIterators(List<QueryDataIterator> QueryDataIterators);
  // ...
}

UDTF接口

public abstract class UDTF extends UDF {


  protected ElasticSerializableTVList collector; // 数据收集器,获取transform后的序列,ElasticSerializableTVList是含内存管理的TVList
  
  protected int rowLimit;
  protected int rowOffset;
  
  // 用户需要实现
  public abstract boolean hasRemainingDataToTransform();
  public abstract void transform(int fetchSize); // 用户自行保存上下文


  // 保留给系统进行设置
  public final void setCollector(ElasticSerializableTVList collector) { ... }
  public final void setRowLimit(int rowLimit) { ... }
  public final void setRowOffset(int rowOffset) { ... }

UDAF接口

// 暂时不实现此类型的UDAF

public abstract class UDAF extends UDF {
  
  // 用户需要实现
  public abstract Object aggregate();
}

直接利用Statistics数据情境

// QueryDataIterator属性无效
public abstract class UDAF extends UDF {
  // 实现用于构造 AggregateResult 的接口...
  public abstract void updateResultFromPageData(BatchData dataInThisPage);
  public abstract void updateResultUsingTimestamps(long[] timestamps, int length, IReaderByTimestamp dataReader);
  // ...
}

示例

public class HiFiUDTF extends PointDataUDTF {


  private int hifiSampleSize;
  private String hifiWeightOperator;
  private String hifiSampleOperator;  
  
  private int totalReturnedRowsNumber;


  public String name() {
    return "hifi";
  }
  public String description() {
    return "This is a HiFi query example.";
  }
  
  public void initialize(
    UDFParameters parameters,
    UDFConfigurations configurations
  ) {
    totalReturnedRowsNumber = 0;
    hifiSampleSize = parameters.getInteger(1);
    hifiWeightOperator = parameters.getString(2);
    hifiSampleOperator = parameters.getString(3);
    configurations
      .setMeasurementIndexes(Arrays.asList(0))
      .setDataPointAccessStrategies(0,FETCH_BY_POINT)
      .setOutputDataType(TSDataType.FLOAT)
      .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
      .setTimeout(10000);
  }
  public abstract void terminate() {
    // ignored 
  }
  
  public boolean hasRemainingDataToTransform() {
    return totalReturnedRowsNumber < rowLimit 
      && (IDataPointIterator iterators.get(0)).hasNext();
  }
  public void transform() {
    int alreadyReturnedRowsNumber = 0;
    IDataPointIterator iterator = 
      (IDataPointIterator) iterators.get(0);
    while (alreadyReturnedRowsNumber < fetchSize
      && totalReturnedRowsNumber < rowLimit 
      && iterator.hasNext()) {
      iterator.next();
      if (iterator.currentIndex() < rowOffset) {
        continue;
      }
      collector.putFloat(
        iterator.currentTime(), 
        doSomething(iterator.currentFloat())
      );
      ++totalReturnedRowsNumber;
    }
  }

临时查询文件管理器

工具类,提供临时查询文件管理、读写等的静态方法。

主要功能描述:

  1. SerializableTVList和SerializableRowRecordList的序列化与反序列化

文件格式:

...

  1. FLOAT / DOUBLE / INTEGER / LONG / BOOLEAN
  2. 格式: time value time value time value ...

...

管理器

装载卸载管理:代码见org.apache.iotdb.db.query.udf.service.UDFRegistrationService

临时文件管理:代码见org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService


用户接口设计

所有提供给用户编程的接口/类都在包 org.apache.iotdb.db.query.udf.api 中。所有的接口/类都添加了详细的Javadoc,可以选择从UDTF类开始阅读了解。

所有用户实现的UDTF都要通过继承UDTF类实现。UDTF类包括4个可以Override的方法,分别是beforeStart、beforeDestroy和两个签名不同的transform方法。一个完整的UDTF类至少需要Override beforeStart方法和其中一种transform方法。

beforeStart方法在transform方法调用前调用。主要作用有三类。

  • 第一类作用是利用UDFParameters解析用户输入的UDTF参数,即UDTF括号中的部分。UDTF允许用户定义两类参数,一种是时间序列参数,一种是KV对形式(KV都必须是使用引号引用的字符串)的属性参数。KV对形式的参数必须在输入完全部的时间序列参数后才能输入。两类参数都可以为空。时间序列参数决定了一个UDTF的原始数据由哪些序列生成,用户将在transform方法中拿到这些原始数据。如果时间序列参数多于1个,那么用户将得到的是这几列时间序列按照时间对齐后的数据行。用户可以在UDFParameters中得到输入的时间序列Path,可以通过UDFParameters内置的解析工具将KV对形式的属性参数进行类型转换。
  • 第二类作用是利用UDTFConfigurations配置UDTF的必要属性。目前有两类必要属性,一类是UDTF的输出数据类型,还有一种是原始数据的访问策略。目前允许用户指定三种类型的原始数据访问策略。(1)逐行读取原始数据;(2)滑动时间窗口读取原始数据(类似GroupBy的滑动时间窗口,需要指定DisplayWindowBegin、DisplayWindowEnd、TimeInterval和SlidingStep,该时间窗口允许窗口重叠);(3)固定窗口数据行数目的方式读取原始数据。其中后面两种需要指定参数,这些参数可以通过UDFParameters从用户输入的SQL中获得。
  • 第三类作用是为查询分配一些自定义的资源:打开文件、建立外部连接等。

beforeDestroy方法在一个UDTF查询结束后被调用。主要作用是释放用户自定义的一些资源。

transform方法是UDTF的核心方法,它的作用是对原始数据进行变换。它有两种签名,一种是UDTF#transform(Row, PointCollector),一种是UDTF#transform(RowWindow, PointCollector),每一种签名的第一个参数为原始数据的输入参数,第二个参数为变换后数据的输出参数。当用户在UDTFConfigurations中将访问原始数据的策略配置为逐行读取时,第一种签名的方法就会被调用。当用户将原始数据的访问策略配置为按滑动时间窗口读取或者固定数据行数的窗口读取时,第二种签名的方法就会被调用。根据原始数据划分的行数或者窗口数量,transform方法将被多次调用,每次调用都允许用户生成任意数量的数据点。


查询计划生成

有三点是需要关注的。

其一,由于允许原始数据查询与UDTF查询混合进行,所以需要修改原先逻辑计划和物理计划的生成方式。基本的做法是引入了UDFContext这一数据结构。相关文件可见SelectOperator、UDFPlan和LogicalGenerator、PhysicalGenerator。

其二,支持UDTF输入多个时间序列的场景,还必须考虑输入序列为*的情况。如下面的场景:现在有时间序列root.sg.d.s0, root.sg.d.s1, root.sg.d.s2, root.sg.d.s3,执行查询select udf(*, *, *) from root.sg.d 时,实际是执行了4*4*4个UDTF查询。在查询计划生成的时候,需要消除*符号并计算查询候选序列的笛卡尔积。代码逻辑主要在ConcatPathOptimizer中。

其三,UDTF查询列去重的逻辑。注意udf(a, b)和udf(b, a)不同,udf(a, b)和udf(a, b, 'k'='v')不同。代码逻辑主要在PhysicalGenerator#deduplicate中。


求值架构与过程

Image Added

UDTF的求值采用了如上图所示的3层架构:OutputLayer、TransformerLayer和InputLayer。

求值的基本思想是按需惰性求值:当输出层需要输出新的RowRecord时,Transformer层会首先消耗缓存的先前一次tranform执行生成的数据点,当缓存数据点全部被消耗,才执行新的一批数据的transform,这时才可能要求原始查询层输出下一个RowRecord。

InputLayer

InputLayer由原始查询的输出层RawQueryDataSetWithValueFilter或者RawQueryDataSetWithoutValueFilter,以及一个缓存队列ElasticSerializableRowRecordList组成。

RawQueryDataSet的输入时间序列为查询中UDTF查询和原始查询所有输入时间序列的并集;查询条件输入则与实际查询输入的一致。

InputLayer允许Transformer层按需调用RawQueryDataSet的nextWithoutConstraint()和hasNextWithoutConstraint()获取下一行原始数据的结果,结果会直接缓存到缓存队列中,Transformer层可以通过index访问原始数据;InputLayer允许Transformer层设置原始数据输出层某一列最小未消费元素的索引,以便在必要的时候减少内存管理时序列化的次数(见内存管理)。

代码见包org.apache.iotdb.db.query.udf.core.input。

TransformerLayer

TransformerLayer是架设在InputLayer和OutputLayer之间的中间层,它的作用是:直接传递或者变换InputLayer中的一列或者多列原始数据,产生一列新的时间序列,并对外提供一个统一的列式读取接口,使得OutputLayer能够按列读取新的时间序列,方便生成新的DataSet。

设计中,每一个输出结果列都会对应一个Transformer。根据输出列类型的不同,又可以分为三种Transformer:

  • RawQueryPointTransformer:用于转发原始数据查询的列。Transformer中保存了指定列中最后一个被消费的非空数据的索引,该索引在每次转发时就会更新一次。
  • UDFQueryRowTransformer:用于生成变换函数为UDTF#transform(Row, PointCollector)的列(用户指定UDTF访问原始数据方式为逐行访问的列)。Transformer中保存了所有用于变换的列的索引,同时保存了最后一个被消费的原始数据行的索引。transform方法的执行会在Transformer中进行,行索引在每次执行时就会更新一次。transform的执行结果会在Transformer中缓存,直到被OutputLayer完全消费。
  • UDFQueryRowWindowTransformer:用于生成变换函数为UDTF#transform(RowWindow, PointCollector)的列(用户指定UDTF访问原始数据方式为窗口访问的列)。Transformer中保存了所有用于变换的列的索引,同时以数组的形式保存了一个窗口对应的所有原始数据行的索引。transform方法的执行会在Transformer中进行,一个窗口中所有原始数据的索引在每次执行时就会更新一次。transform的执行结果会在Transformer中缓存,直到被OutputLayer完全消费。

上述三种Transformer都会实现LayerPointReader接口,OutputLayer利用该接口操纵数据。

下图是Transformer的类继承关系。

Image Added

代码见包org.apache.iotdb.db.query.udf.core.transfomrer。

OutputLayer

作用是列式读取TransformerLayer的数据,并拼接UDTF查询的结果集。

现在支持生成NonAlign结果集和AlignByTime结果集。两个结果集的生成逻辑与原始查询对应的两个结果集生成逻辑相似,区别在于UDTF的结果集生成是串行执行的。

代码见org.apache.iotdb.db.query.dataset.UDTFDataSet及其子类。

One More Thing

可以基于这种求值模式进行扩展,支持UDTF复合函数求值:每一个UDTF单元都对应一个transformer,然后对transform的执行时机进行拓扑排序,构建tranformer的执行pipeline……然后下推求值。


内存管理

在整个UDTF的求值过程中,有3处地方需要进行内存管理:

  1. InputLayer: TransformerLayer中不同Transformer的消费进度不同,可能导致在内存中驻留大量的RowRecord
  2. TransformerLayer: 窗口中数据行的数目过大,可能导致可用内存无法装下所有的索引信息
  3. OutputLayer: 某一个Transfomer在一个窗口中生成的数据点数过多,导致占用大量的内存

基于上述考虑,设计了目前的内存管理的方案:

  1. 用户可以配置一个总最大允许的内存用量(记为T),用于含有UDTF查询的过程(例如:select a, b, c, udf(a, b), udf(b, c)就是一个含有UDTF的查询过程,这个过程包含两个UDTF查询)。T默认为300MB,最大允许用户将T设置为读内存的20%。// 这个总允许花费内存的配置方法可以再讨论
  2. T分为3个部分使用,用户可以按比例把T划分给InputLayer、TransformerLayer和OutputLayer。默认分配为InputLayer:TransformerLayer:OutputLayer = 1:1:1。
  3. 分配给InputLayer的内存全部用于缓存未完全被消费的原始数据RowRecord。
  4. 分配给Transformer的内存利用如下:记Transformer中需要进行窗口计算的Transformer数量为n,那么分配给Transformer的总内存会被平均划分为n份,每一份内存供一个Transform用于缓存索引。对于不需要进行窗口计算的Transformer(原始数据Transformer和逐行处理数据的Transformer),内存占用为O(1),无需特别考虑。
  5. 分配给OutputLayer的内存利用如下:记去重后的输出列个数为n,那么分配给OutputLayer的总内存会被平均划分为n份,每一份内存供一个PointCollector用于缓存收集到的结果数据点。

上述1、2相关代码IoTDBDescriptor#loadUDFProps()。上述3、4、5相关代码见UDTFDataSet。

从上述的内存管理方案中我们可以看到,分配给InputLayer/TransformerLayer/OutputLayer的内存是一个有限的定值。我们需要估计上述三个Layer的内存用量并在Layer内存超出最大允许值时采取行动。为此,我们设计了3种数据结构,分别是ElasticSerializableRowRecordList、ElasticSerializableIntList和ElasticSerializableTVList。他们都基于同一种思想进行设计,下面将介绍这一思想。

场景特征:

  1. 内存用量受限,设最大允许用量为M。
  2. 内存受限的数据结构类似于列表,需要对该种列表进行append操作,进行顺序读和随机读操作。
  3. 在某一时刻后,某个index前的数据可以全部丢弃。

设计:

  • 将列表按照固定长度划分为若干块,每次只允许最多N块驻留在内存中。当总块数超过N块时,使用LRU策略替换块,被替换的块将被序列化到临时文件中。查询结束后清除该查询所有的临时文件。
  • 可以根据场景指定驻留在内存中的块数N,那么每个块能够分配到的内存为M/N。
  • 对于固定长度的数据而言,假定每条数据长度为E,那么每个块能够容纳的数据条数为M/N/E。
  • 对于不固定长度的数据而言,使用探测的方法定出一个数据长度上限E用于块中数据条数的计算,因而每个块能够容纳的数据条数依旧计算为M/N/E。
    • 用户可以在配置文件中指定数据长度上限的初始值E,默认为48bytes
    • 维护一个实际总长度值Ma。每次列表有append操作,就将append数据的长度累加到实际总长度值上。
    • 每当append操作到达1000次(达到一次检查阈值),就执行一次内存占用检查。如果当前列表的长度l * E > Ma,即开始一次内存调整:
      • 首先倍增法调整E,将当前E翻倍,检查是否满足l * E <= Ma;若不满足,则不断翻倍、检查,直到找到这个E
      • 检查M/N/E是否小于阈值(为了保证性能,序列化的块也不能太小),如果此时的E导致块大小过小,则更换下面方法调整E
        • E <- E + 2 * ((Ma - l * E) / l)
        • 如果利用新方法调整E仍然会导致块大小过小,则直接抛异常,要求用户分配更多内存
      • 创建新的列表,使用新E计算一个块可以容纳的元素数量,将旧列表的元素拷贝到新列表中
    • 对于一个现实的序列,其中负载的数据长度应该是比较稳定的。在append次数足够多之后,基本可以计算得到一个稳定的数据长度平均值。内存调整在数据刚开始append的时候可能会比较频繁,当然这时候序列基本都在内存当中,全序列拷贝基本不会触发内外存的交换(此时扩增操作基本等同于List的扩增),也就不会有效率上的问题。而当数据append到需要内外存交换的程度时,基本也无需触发内存调整了。
  • 给定元素索引index,可立即计算得到元素位于索引为index/(M/N/E)的块中,在块中的索引为index%(M/N/E)。
  • 利用上面提到的场景特征3,还可以进一步减少块序列化的次数(仅使用在ElasticSerializableRowRecordList和ElasticSerializableTVList中):记录列表中最小一个有用的index,当触发LRU替换时,若被替换块的最大index小于最小有用index,则直接抛弃该块,不再进行序列化(因为它永远不会被反序列化)。

...

  1. 可能可以使用到TSFileFactory?
  2. 生成文件路径
    1. 例如:data/query/{query-id}/{path-name / merged-series-name}/{SerializableTVList-index}.temp。index指的是SerializableTVList在ElasticSerializableTVList中的index。
  3. 删除临时文件
  4. ...

核心数据结构

TODO:确定BatchData和TVList关系。

interface SerializableList {


  ByteBuffer serialize(); 
  void deserialize(ByteBuffer buffer); 
 
  int capacity();
  boolean isFull();
  
  void release(); // 清空内存
  // ...
}

SerializableTVList

核心功能描述:

  1. SerializableTVList接收一个int型构造参数,该参数为values和timestamps数组的size()限制(values和timestamps的长度最多为该参数)。SerializableTVList通过控制该int型参数控制一个SerializableTVList的内存占用。提供capacity()和isFull()等方法。
  2. static方法:针对不同序列类型,给定内存限制,返回一个TVList中values和timestamps的size限制。
  3. 基于TVList拓展,增加序列化与反序列化能力。可以将values和timestamps整体序列化进ByteBuffer,并从ByteBuffer整体反序列化。序列化后,调用release()清空内存占用。
  4. 由外部(ElasticSerializableTVList)控制序列化与反序列化的过程。
public class SerializableLongTVList 
  extends LongTVList   
  implements SerializableList {
  
  public static int getInternalTVListSizeLimit(int memoryLimitInMB) {
    // ...
  }
  
  public SerializableLongTVList(int internalTVListSizeLimit) {
    super();
    // ...
  } 
}

ElasticSerializableTVList

核心需求分析:

  1. 顺序读取、写入能力
  2. 随机读取能力
  3. 读写不存在同时进行的情况

核心功能描述:

  1. 核心数据结构为List<SerializableTVList>,以一个SerializableTVList为基本的管理单元,通过类似TVList的计算方法增加和定位TV元素
  2. 辅助数据结构LRU Cache,用于管理缓存SerializableTVList。缓存替换时对整个SerializableTVList进行替换。
  3. 可以限定一个ElasticSerializableTVList的总容量和缓存个数。可以根据总容量和缓存个数决定一个SerializableTVList的容量。总允许容量低于某个阈值,缓存个数定为1.
  4. 对外提供put()接口,get(int index)接口,TV的迭代器,LRU策略进行缓存替换

优点:

  1. 以SerializableTVList为一块内存管理单元,容易实现
  2. 具备TVList的优点
  3. 缓存方便直接
public class ElasticSerializableTVList {
  
  protected int size;
  protected List<SerializableTVList> tvLists;
  protected LRULinkedHashMap cache;
  
  public ElasticSerializableTVList(
    TSDataType dataType, 
    int memoryLimitInMB, 
    int cacheSize) { // ... }
    
  public long getTime(int index);
  public long getLong(int index);
  public void putLong(long timestamp, long value);
  // ...
  
  public IDataPointIterator getDataPointIterator();
  public IDataPointBatchIterator 
    getSizeLimitedDataPointBatchIterator(int batchSize);
  public IDataPointBatchIterator 
    getTimeWindowDataPointBatchIterator(long timeInterval);
}

SerializableRowRecordList

类似SerializableTVList.

public class SerializableRowRecordList 
  implements SerializableList {


  protected List<RowRecord> rowRecordList;


  public static int getInternalRowRecordListLimit(int memoryLimitInMB) 
  {
    // ...
  }
  
  public SerializableRowRecordList(int internalRowRecordListSizeLimit) 
  {
    // ...
  }

ElasticSerializableRowRecordList

最终实现类似ElasticSerializableTVList的功能。

public class ElasticSerializableRowRecordList {
  
  protected int size;
  protected List<SerializableRowRecordList> rowRecordLists;
  protected LRULinkedHashMap cache;
  
  public ElasticSerializableRowRecordList(
    List<TSDataType> dataTypes, 
    int memoryLimitInMB, 
    int cacheSize) { // ... 
    }
    
  public long getTime(int index);
  public RowRecord getRowRecord(int index);
  public void putRowRecord(RowRecord row);
  // ...
  
  public IRowRecordIterator getRowRecordIterator();
  public IRowRecordBatchIterator 
    getSizeLimitedRowRecordBatchIterator(int batchSize);
  public IDataPointBatchIterator 
    getTimeWindowRowRecordBatchIterator(long timeInterval);
}

迭代接口

QueryDataIterator提供接口帮助用户控制数据点、数据批或者整个序列数据的获取,它提供顺序访问与随机访问的能力。

interface QueryDataIterator {}

数据点的迭代

现有实现中有功能类似的接口

public interface IPointReader {


  boolean hasNextTimeValuePair() throws IOException;


  TimeValuePair nextTimeValuePair() throws IOException;


  TimeValuePair currentTimeValuePair() throws IOException;


  void close() throws IOException

不使用IPointReader一方面出于效率考虑。IPointReader要求包装TimeValuePair,TimeValuePair又会要求TsPrimitiveType。IPointReader直接把time和value分离,直接暴露给用户 ,不必对数据进行包装。

另一方面,IDataPointIterator提供了currentPointIndex的接口用于定位数据,而IPointReader没有。

IDataPointIterator

interface IDataPointIterator extends QueryDataIterator {
  
  // 比IPointReader少一层包装
  // 不使用TimeValuePair


  boolean hasNextPoint();
  void next();
  
  int currentPointIndex();
  long currentTime();
  long currentLong();
  int currentInt();
  // ...
} 

IDataPointBatchIterator

interface IDataPointBatchIterator extends QueryDataIterator {
  
  boolean hasNextBatch();
  void next();
  
  // 顺序访问能力
  int currentBatchIndex();
  IDataPointIterator currentBatch();
  
  // 随机访问能力
  int currentBatchSize();
  long getTimeInCurrentBatch(int index);
  int getIntInCurrentBatch(int index);
  long getLongInCurrentBatch(int index);
  bool getBooleanInCurrentBatch(int index);
  float getFloatInCurrentBatch(int index);
  double getDoubleInCurrentBatch(int index);
}

数据行的迭代

IRowRecordIterator

interface IRowRecordIterator extends QueryDataIterator {


  boolean hasNextRowRecord();
  void next();
  
  int currentRowRecordIndex();
  RowRecord currentRowRecord();
  long currentTime();
} 

IRowRecordBatchIterator

interface IRowRecordBatchIterator extends QueryDataIterator {
  
  boolean hasNextBatch();
  void next();
  
  // 顺序访问能力
  int currentBatchIndex();
  IRowRecordIterator currentBatch();
  
  // 随机访问能力
  int currentBatchSize();
  long getTimeInCurrentBatch(int index);
  RowRecord getRowRecordInCurrentBatch(int index);
}

全体数据的迭代

IOverallDataPointIterator

interface IOverallDataPointIterator extends QueryDataIterator {


  public IDataPointIterator getPointDataIterator();
  public IDataPointBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
  public IDataPointBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
  
  int size();
  long getTime(int index);
  int getInt(int index);
  long getLong(int index);
  bool getBoolean(int index);
  float getFloat(int index);
  double getDouble(int index);
}

IOverallRowRecordIterator

interface IOverallRowRecordIterator extends QueryDataIterator {


  public IRowRecordIterator getRowRecordIterator();
  public IRowRecordBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
  public IRowRecordBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
  
  int size();
  long getTime(int index);
  RowRecord getRowRecord(int index);
}

数据迭代实现QueryDataIterator

利用核心数据结构实现迭代接口。

数据点

DataPointIterator

public class DataPointIterator 
  implements IPointIterator { // IPointIterator实现为按需获取


  protected ElasticSerializableTVList tvList;


  public DataPointIterator(List<ManagedSeriesReader> readers) {
    // 不带值过滤...  SeriesRowDataBatchReader 
  }
  
  public DataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }

DataPointBatchIterator

public abstract class DataPointBatchIterator 
  implements IBatchIterator { // IBatchIterator实现为按需获取


  protected ElasticSerializableTVList tvList;
    
  public DataPointBatchIterator(List<ManagedSeriesReader> readers) {
    // 不带值过滤... 
  }
  
  public DataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }
}


class SizeLimitedDataPointBatchIterator 
  extends DataPointBatchIterator {
  
  public SizeLimitedDataPointBatchIterator(List<ManagedSeriesReader> readers, int batchSize) {
    // 不带值过滤... 
  }
  
  public SizeLimitedDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, int batchSize) {
    // 带值过滤...
  }
  
  // ...
} 


class TimeWindowDataPointBatchIterator 
  extends DataPointBatchIterator {
  
  public TimeWindowDataPointBatchIterator(List<ManagedSeriesReader> readers, long timeWindowInNanoSeconds) {
    // 不带值过滤... 
  }
  
  public TimeWindowDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, long timeWindowInNanoSeconds) {
    // 带值过滤...
  }
  
  // ...

OverallDataPointIterator

public class OverallDataPointIterator 
  implements IOverallDataPointIterator {


  protected ElasticSerializableTVList tvList;
  // ...
  
  public OverallDataPointIterator(List<ManagedSeriesReader> readers) 
  {
    // 不带值过滤...  SeriesRowDataBatchReader 
  }
  
  public OverallDataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }
} 

数据行

RowRecordIterator

与DataPointIterator类似,提供3种类型的Iterator。

RowRecordIterator:迭代

RowRecordBatchIterator:迭代、随机

OverallRowRecordIterator:迭代、随机

构造方式类同,实现时会增加一个时间的最小堆用于归并排序。

...

实现方式

不改变IoTDB整体的查询框架,查询方式按照生成QueryPlan -> QueryExecutor -> QueryDataSet进行。

UDTF

按照构造UDTFQueryPlan -> 构造UDTFQueryExecutor -> 构造UDTFQueryDataSet的顺序进行查询。

UDTFQueryPlan通过extends RawQueryPlan进行改造,拓展解析出若干UDF的属性与方法。在此阶段构造UDTF实例,调用UDTF实例的初始化方法(解析并初始化设置等)。

UDTFQueryExecutor通过extends RawQueryExecutor进行改造,可以复用大部分逻辑(生成reader和TimeGenerator等的逻辑)。

UDTFQueryDataSet需要包含三种类型的实现:AlignByTime / AlignByDevice / NonAlign

对于RawQuery,其AlignByTime的QueryDataSet按照带值过滤与否被实现为了RawQueryDataSetWithoutValueFilter和RawQueryDataSetWithValueFilter;

NonAlign的QueryDataSet在NonAlignEngineDataSet中实现(仅包含不带值过滤的逻辑);

AlignByDevice的QueryDataSet在AlignByDeviceDataSet中借助RawQueryDataSet生成。

对于UDTF,AlignByDevice的数据生成比较简单,可以通过简单改写AlignByDeviceDataSet实现,借助下面的UDTFQueryDataSet即可直接生成。

对于UDTFQueryDataSet的实现(包括AlignByTime / NonAlign的情况),一种自然的想法是通过extends RawQueryDataSet/NonAlignEngineDataSet或者直接在RawQueryDataSet/NonAlignEngineDataSet上改写的方式进行,以此复用部分原始数据获取(生产)和结果数据填充(消费)的逻辑。但这有两个问题:

  1. 在AlignByTime的实现中,UDTF虽然获取原始数据的方式是不同的,但是这些数据面向UDF编程接口时(进行transform时)没有不同,在UDF编程接口中只能看到点、批、全量的迭代器。通过extends RawQueryDataSet或者直接在RawQueryDataSet上改写的方式进行实现,会导致出现两套一模一样的transform逻辑。而且,RawQueryDataSetWithoutValueFilter 和 RawQueryDataSetWithValueFilter 的数据生产逻辑难以直接被UDF直接利用,需要较大的改动;数据结果填充逻辑更是完全无法利用。
  2. UDF生成NonAlignDataSet时无法extends RawQuery的NonAlignDataSet,同时UDF在数据完成transfrom前和AlignByTime的处理逻辑没有任何的不同,仅在结果集填充(fillBuffer)上有区别。

针对问题1,UDTFQueryDataSet实现时可以不以类区分带值过滤和不带值过滤的情况,实现一个UDTFQueryDataSet提供两个构造函数,分别处理带值过滤和不带值过滤的原始数据生成,然后走同一套transform流程;

针对问题2,可以通过extends UDTFQueryDataSet实现UDTFNonAlignDataSet和UDTFAlignByTimeDataSet,在两个类中分别实现一个fillBuffer用于打包NonAlign和AlignByTime的数据。

实现方式可通过改造RawQueryDataSet的方式进行,使得UDTFQueryDataSet和RawQueryDataSet可以复用相同的逻辑进行生成。下列陈述描述了UDTFQueryDataSet的必要构造过程。

最终的类关系如下:

Image Removed

UDAF

按照构造UDAFQueryPlan -> 构造UDAFQueryExecutor -> 构造UDAFQueryDataSet的顺序进行查询。

把StatisticsUDAF视为Aggregation的一种特例,UDAFQueryPlan的实现可通过直接在AggregationPlan中增加若干属性进行。在UDAFQueryPlan构造阶段构造UDAF实例,调用UDAF实例的初始化方法(解析并初始化设置等)。

...