You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

UDF 框架设计

(Author: Steve Yurong Su)

(Original link: https://shimo.im/docs/vqhkTcDH6QXvQkyw)

UDF调研

时序数据库UDF

InfluxDB (自定Script,与kapacitor交互): https://docs.influxdata.com/kapacitor/v1.5/nodes/u_d_f_node/

TimescaleDB (类似Postgre): https://docs.timescale.com/latest/using-timescaledb/reading-data#advanced-analytics

FaunaDB (GraphQL): https://docs.fauna.com/fauna/current/api/graphql/functions

DolphinDB: https://www.dolphindb.com/help/


其他框架UDF

MySQL:

https://www.cnblogs.com/raker/p/4377343.html

https://blog.csdn.net/shaoyiwenet/article/details/53256103

Hive (HQL,定义方式,加载方式jar包动态加载):

https://blog.csdn.net/weixin_42181917/article/details/82865140

https://www.cnblogs.com/hd-zg/p/5947468.html

https://blog.csdn.net/ZZQHELLO2018/article/details/105700649

Flink:

https://blog.csdn.net/wangpei1949/article/details/103444412


几种查询UDF的使用情境

  1. 需要获取全量的查询数据才能进一步处理的情境(需要向用户提供随机、顺序访问序列的能力)
  2. 可以获取一定量数据后按批处理的情境(需要向用户提供随机、顺序访问序列的能力)
  3. 可以逐数据点(行)处理的情境 (需要向用户提供顺序访问序列的能力)


(English)

Several query UDF usage scenarios

  1. Situations that need to obtain the full amount of query data for further processing (need to provide users with the ability to access sequences randomly and sequentially)
  2. A situation where a certain amount of data can be obtained and processed in batches (the user needs to be provided with the ability to access the sequence randomly and sequentially)
  3. Scenarios that can be processed by data points (rows) (need to provide users with the ability to access sequences sequentially)


一个UDF查询的情景可能包括单序列查询、多序列查询。

单序列:

select udf(s1) from root.sg1.d1;
select udf(s1), s2 from root.sg1.d1;
select udf(s1), udf(s2) from root.sg1.d1; 

多序列:

select udf(s1, s2, s3) from root.sg1.d1; 
select udf(s1, s2, "string", "number"), udf1(s1), s3 from root.sg1.d1; 


在多序列查询的场景下,提供用户如下迭代方式进行transfrom:

分别按点迭代每条序列

直接迭代多条序列按时间对齐后的行


以上场景都需要考虑内存受限的情况。


UDF管理SQL语法

装载

CREATE [temporary] FUNCTION function_name AS class_name;


卸载

DROP [temporary] FUNCTION function_name;


展示

SHOW [temporary] FUNCTIONS;


编程接口设计

区分

用户定义聚合函数UDAF(源数据-生成数据 多对一

用户定义表格函数UDTF(源数据-生成数据 多对多


UDF传入参数的获取接口

/**
 * select语句中,udf接收的参数
 * 接收带单引号或双引号 STRING 类型的参数或者suffix path类型的参数
 * 参数可变长,允许有多个measurements
 * 
 * 例:
 * select 
 *   udf1(measurement0, measurement1, parameter0, parameter1), 
 *   udf2(measurement2, parameter2),
 *   measurement3, 
 *   measurement4
 * from root.sg1. ..;
 * 
 * 对udf1,measurement0, measurement1, parameter0, parameter1就是
 * parameters
 */
public class UDFParameters {


  int length();


  boolean getBoolean(int index);
  int getInteger(int index);
  long getLong(int index);
  float getFloat(int index);
  double getDouble(int index);
  String getString(int index);
  Path getPath(int index);


UDF用户个性化配置接口

最后可能区分UDAF UDTF的配置接口,以提供不同的配置选项。

/**
 * 给用户个性化配置UDF的接口
 * UDFConfigurations = new UDFConfigurations();
 * configurations
 *   .setMeasurementIndexes(Arrays.asList(0, 1, 2))
 *
 *   .setDataPointAccessStrategies(
 *      2, FETCH_BY_TIME_WINDOW))
 *   .setBatchTimeWindow(2, 100_000)
 *
 *   .mergeOriginalSeries("unique-name", Arrays.asList(0, 1))
 *   .setRowRecordAccessStrategies(
 *      "unique-name", FETCH_BY_SIZE_LIMITED_WINDOW)
 *   .setBatchSizeLimit("unique-name", 1000)
 *
 *   .setOutputDataType(TSDataType.FLOAT)
 *   .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
 *   .setTimeout(10000);
 *
 * 用户自定义配置完成后会有验证、检查过程
 */
public class UDFConfigurations {


  enum DataPointAccessStrategy {
    FETCH_BY_POINT, // 提供DataPointIterator
    FETCH_BY_TIME_WINDOW, // 提供DataPointBatchIterator
    FETCH_BY_SIZE_LIMITED_WINDOW, // 提供DataPointBatchIterator
    RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallDataPointIterator
  }
  
  enum RowRecordAccessStrategy {
    FETCH_BY_ROW, // 提供RowRecordIterator
    FETCH_BY_TIME_WINDOW, // 提供RowRecordBatchIterator
    FETCH_BY_SIZE_LIMITED_WINDOW, // 提供RowRecordBatchIterator
    RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallRowRecordIterator
  }
  
  // 说明 measurement 在参数中的位置,以0为起点
  public UDFConfigurations setMeasurementIndexes(int[] indexes);
  
  // 说明各个序列的读取策略,以此参数提供不同的iterators供用户使用
  public UDFConfigurations setDataPointAccessStrategies(
    int measurementIndex, 
    DataPointAccessStrategy pointAccessStrategies);  
  public UDFConfigurations setBatchSizeLimit(
    int measurementIndex, int sizeLimit);
  public UDFConfigurations setBatchTimeWindow(
    int measurementIndex, long timeWindow);
  public UDFConfigurations setBatchTimeWindow(
    int measurementIndex, long timeWindow, long startTime); 
    // startTime 提供 MIN_TIME_IN_SERIES
  
  public UDFConfigurations mergeOriginalSeries(
    String mergedSeriesName, int[] indexes);
  public UDFConfigurations setBatchSizeLimit(
    String mergedSeriesName, int sizeLimit);
  public UDFConfigurations setBatchTimeWindow(
    String mergedSeriesName, long timeWindow);
  public UDFConfigurations setBatchTimeWindow(
    String mergedSeriesName, long timeWindow, long startTime); 
    // startTime 提供 MIN_TIME_IN_SERIES
 
  // transform 后结果的类型
  public UDFConfigurations setOutputDataType(TSDataType dataType);
  
  // 展现在客户端的头信息
  public UDFConfigurations setColumnHeader(String header);
  
  // 执行时间限制
  public UDFConfigurations setTimeout(long timeout);


  // 更多setter以及getter ...
}


UDF接口

List可以变更为[]。

public abstract class UDF {


  protected List<Path> paths;
  protected List<TSDataType> originalSeriesDataTypes;
  
  // iterators,用户从此拿数据
  // OverallDataPointIterator / 需要获取全量的查询数据才能进一步处理的情境
  // DataPointBatchIterator / 可以获取一定量数据后按批处理的情境
  // DataPointIterator / 可以逐数据点处理的情
  // 针对序列可以分开处理的情形
  protected List<DataPointIterator> DataPointIterators; 
  // 多序列归并后,按时间对齐的结果,需要在UDFConfigurations.mergeOriginalSeries
  protected HashMap<String, RowRocordIterator> RowRecordIterators; 
  
  // 用于 show 管理语句信息的加载
  public abstract String name();
  public abstract String description();
  
  // initialize 调用后,实例内所有的属性才可用
  // 用于初始化,加载资源等
  public abstract void initialize(
    UDFParameters parameters,
    UDFConfigurations configurations
  );
  // 释放资源等
  public abstract void terminate();
  
  // 保留给系统进行设置
  public void setPaths(List<Path> paths);
  public void setOriginalSeriesDataTypes(List<TSDataType> dataTypes);
  public void setQueryDataIterators(List<QueryDataIterator> QueryDataIterators);
  // ...
}


UDTF接口

public abstract class UDTF extends UDF {


  protected ElasticSerializableTVList collector; // 数据收集器,获取transform后的序列,ElasticSerializableTVList是含内存管理的TVList
  
  protected int rowLimit;
  protected int rowOffset;
  
  // 用户需要实现
  public abstract boolean hasRemainingDataToTransform();
  public abstract void transform(int fetchSize); // 用户自行保存上下文


  // 保留给系统进行设置
  public final void setCollector(ElasticSerializableTVList collector) { ... }
  public final void setRowLimit(int rowLimit) { ... }
  public final void setRowOffset(int rowOffset) { ... }


UDAF接口

// 暂时不实现此类型的UDAF

public abstract class UDAF extends UDF {
  
  // 用户需要实现
  public abstract Object aggregate();
}


直接利用Statistics数据情境

// QueryDataIterator属性无效
public abstract class UDAF extends UDF {
  // 实现用于构造 AggregateResult 的接口...
  public abstract void updateResultFromPageData(BatchData dataInThisPage);
  public abstract void updateResultUsingTimestamps(long[] timestamps, int length, IReaderByTimestamp dataReader);
  // ...
}


示例

public class HiFiUDTF extends PointDataUDTF {


  private int hifiSampleSize;
  private String hifiWeightOperator;
  private String hifiSampleOperator;  
  
  private int totalReturnedRowsNumber;


  public String name() {
    return "hifi";
  }
  public String description() {
    return "This is a HiFi query example.";
  }
  
  public void initialize(
    UDFParameters parameters,
    UDFConfigurations configurations
  ) {
    totalReturnedRowsNumber = 0;
    hifiSampleSize = parameters.getInteger(1);
    hifiWeightOperator = parameters.getString(2);
    hifiSampleOperator = parameters.getString(3);
    configurations
      .setMeasurementIndexes(Arrays.asList(0))
      .setDataPointAccessStrategies(0,FETCH_BY_POINT)
      .setOutputDataType(TSDataType.FLOAT)
      .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
      .setTimeout(10000);
  }
  public abstract void terminate() {
    // ignored 
  }
  
  public boolean hasRemainingDataToTransform() {
    return totalReturnedRowsNumber < rowLimit 
      && (IDataPointIterator iterators.get(0)).hasNext();
  }
  public void transform() {
    int alreadyReturnedRowsNumber = 0;
    IDataPointIterator iterator = 
      (IDataPointIterator) iterators.get(0);
    while (alreadyReturnedRowsNumber < fetchSize
      && totalReturnedRowsNumber < rowLimit 
      && iterator.hasNext()) {
      iterator.next();
      if (iterator.currentIndex() < rowOffset) {
        continue;
      }
      collector.putFloat(
        iterator.currentTime(), 
        doSomething(iterator.currentFloat())
      );
      ++totalReturnedRowsNumber;
    }
  }


临时查询文件管理器

工具类,提供临时查询文件管理、读写等的静态方法。

主要功能描述:

  1. SerializableTVList和SerializableRowRecordList的序列化与反序列化

文件格式:

    1. 目标:根据index可以快速定位元组的offset,可同时面向定长和不定长数据进行序列化
    2. 定长数据:
      1. FLOAT / DOUBLE / INTEGER / LONG / BOOLEAN
      2. 格式: time value time value time value ...
    3. 不定长数据:
      1. 格式:time offset time offset time offset ...
      2. 由于单个文件可存储的time value对是固定的,由capacity()决定。因此第capacity()个time offset位置后可存储不定长数据。
      3. 设置一个阈值s,在按批次从磁盘反序列化时,若某个(行)数据长度低于s,则在该批次反序列化时和其他数据一起反序列化;反之,在访问时才反序列化。
      4. STRING存储格式:length + data
      5. RowRecord存储格式: length + (BOOLEAN data) + (BOOLEAN data) + ... BOOLEAN记录是否为null
  1. 管理文件
    1. 例如:data/query/{query-id}/{path-name / merged-series-name}/{SerializableTVList-index}.temp。index指的是SerializableTVList在ElasticSerializableTVList中的index。
    1. 可能可以使用到TSFileFactory?
    2. 生成文件路径
    3. 删除临时文件
    4. ...


核心数据结构

TODO:确定BatchData和TVList关系。

interface SerializableList {


  ByteBuffer serialize(); 
  void deserialize(ByteBuffer buffer); 
 
  int capacity();
  boolean isFull();
  
  void release(); // 清空内存
  // ...
}


SerializableTVList

核心功能描述:

  1. SerializableTVList接收一个int型构造参数,该参数为values和timestamps数组的size()限制(values和timestamps的长度最多为该参数)。SerializableTVList通过控制该int型参数控制一个SerializableTVList的内存占用。提供capacity()和isFull()等方法。
  2. static方法:针对不同序列类型,给定内存限制,返回一个TVList中values和timestamps的size限制。
  3. 基于TVList拓展,增加序列化与反序列化能力。可以将values和timestamps整体序列化进ByteBuffer,并从ByteBuffer整体反序列化。序列化后,调用release()清空内存占用。
  4. 由外部(ElasticSerializableTVList)控制序列化与反序列化的过程。
public class SerializableLongTVList 
  extends LongTVList   
  implements SerializableList {
  
  public static int getInternalTVListSizeLimit(int memoryLimitInMB) {
    // ...
  }
  
  public SerializableLongTVList(int internalTVListSizeLimit) {
    super();
    // ...
  } 
}


ElasticSerializableTVList

核心需求分析:

  1. 顺序读取、写入能力
  2. 随机读取能力
  3. 读写不存在同时进行的情况


核心功能描述:

  1. 核心数据结构为List<SerializableTVList>,以一个SerializableTVList为基本的管理单元,通过类似TVList的计算方法增加和定位TV元素
  2. 辅助数据结构LRU Cache,用于管理缓存SerializableTVList。缓存替换时对整个SerializableTVList进行替换。
  3. 可以限定一个ElasticSerializableTVList的总容量和缓存个数。可以根据总容量和缓存个数决定一个SerializableTVList的容量。总允许容量低于某个阈值,缓存个数定为1.
  4. 对外提供put()接口,get(int index)接口,TV的迭代器,LRU策略进行缓存替换


优点:

  1. 以SerializableTVList为一块内存管理单元,容易实现
  2. 具备TVList的优点
  3. 缓存方便直接


public class ElasticSerializableTVList {
  
  protected int size;
  protected List<SerializableTVList> tvLists;
  protected LRULinkedHashMap cache;
  
  public ElasticSerializableTVList(
    TSDataType dataType, 
    int memoryLimitInMB, 
    int cacheSize) { // ... }
    
  public long getTime(int index);
  public long getLong(int index);
  public void putLong(long timestamp, long value);
  // ...
  
  public IDataPointIterator getDataPointIterator();
  public IDataPointBatchIterator 
    getSizeLimitedDataPointBatchIterator(int batchSize);
  public IDataPointBatchIterator 
    getTimeWindowDataPointBatchIterator(long timeInterval);
}


SerializableRowRecordList

类似SerializableTVList.

public class SerializableRowRecordList 
  implements SerializableList {


  protected List<RowRecord> rowRecordList;


  public static int getInternalRowRecordListLimit(int memoryLimitInMB) 
  {
    // ...
  }
  
  public SerializableRowRecordList(int internalRowRecordListSizeLimit) 
  {
    // ...
  }


ElasticSerializableRowRecordList

最终实现类似ElasticSerializableTVList的功能。

public class ElasticSerializableRowRecordList {
  
  protected int size;
  protected List<SerializableRowRecordList> rowRecordLists;
  protected LRULinkedHashMap cache;
  
  public ElasticSerializableRowRecordList(
    List<TSDataType> dataTypes, 
    int memoryLimitInMB, 
    int cacheSize) { // ... 
    }
    
  public long getTime(int index);
  public RowRecord getRowRecord(int index);
  public void putRowRecord(RowRecord row);
  // ...
  
  public IRowRecordIterator getRowRecordIterator();
  public IRowRecordBatchIterator 
    getSizeLimitedRowRecordBatchIterator(int batchSize);
  public IDataPointBatchIterator 
    getTimeWindowRowRecordBatchIterator(long timeInterval);
}


迭代接口

QueryDataIterator提供接口帮助用户控制数据点、数据批或者整个序列数据的获取,它提供顺序访问与随机访问的能力。

interface QueryDataIterator {}


数据点的迭代

现有实现中有功能类似的接口

public interface IPointReader {


  boolean hasNextTimeValuePair() throws IOException;


  TimeValuePair nextTimeValuePair() throws IOException;


  TimeValuePair currentTimeValuePair() throws IOException;


  void close() throws IOException

不使用IPointReader一方面出于效率考虑。IPointReader要求包装TimeValuePair,TimeValuePair又会要求TsPrimitiveType。IPointReader直接把time和value分离,直接暴露给用户 ,不必对数据进行包装。

另一方面,IDataPointIterator提供了currentPointIndex的接口用于定位数据,而IPointReader没有。


IDataPointIterator

interface IDataPointIterator extends QueryDataIterator {
  
  // 比IPointReader少一层包装
  // 不使用TimeValuePair


  boolean hasNextPoint();
  void next();
  
  int currentPointIndex();
  long currentTime();
  long currentLong();
  int currentInt();
  // ...
} 


IDataPointBatchIterator

interface IDataPointBatchIterator extends QueryDataIterator {
  
  boolean hasNextBatch();
  void next();
  
  // 顺序访问能力
  int currentBatchIndex();
  IDataPointIterator currentBatch();
  
  // 随机访问能力
  int currentBatchSize();
  long getTimeInCurrentBatch(int index);
  int getIntInCurrentBatch(int index);
  long getLongInCurrentBatch(int index);
  bool getBooleanInCurrentBatch(int index);
  float getFloatInCurrentBatch(int index);
  double getDoubleInCurrentBatch(int index);
}


数据行的迭代

IRowRecordIterator

interface IRowRecordIterator extends QueryDataIterator {


  boolean hasNextRowRecord();
  void next();
  
  int currentRowRecordIndex();
  RowRecord currentRowRecord();
  long currentTime();
} 


IRowRecordBatchIterator

interface IRowRecordBatchIterator extends QueryDataIterator {
  
  boolean hasNextBatch();
  void next();
  
  // 顺序访问能力
  int currentBatchIndex();
  IRowRecordIterator currentBatch();
  
  // 随机访问能力
  int currentBatchSize();
  long getTimeInCurrentBatch(int index);
  RowRecord getRowRecordInCurrentBatch(int index);
}


全体数据的迭代

IOverallDataPointIterator

interface IOverallDataPointIterator extends QueryDataIterator {


  public IDataPointIterator getPointDataIterator();
  public IDataPointBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
  public IDataPointBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
  
  int size();
  long getTime(int index);
  int getInt(int index);
  long getLong(int index);
  bool getBoolean(int index);
  float getFloat(int index);
  double getDouble(int index);
}


IOverallRowRecordIterator

interface IOverallRowRecordIterator extends QueryDataIterator {


  public IRowRecordIterator getRowRecordIterator();
  public IRowRecordBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
  public IRowRecordBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
  
  int size();
  long getTime(int index);
  RowRecord getRowRecord(int index);
}


数据迭代实现QueryDataIterator

利用核心数据结构实现迭代接口。

数据点

DataPointIterator

public class DataPointIterator 
  implements IPointIterator { // IPointIterator实现为按需获取


  protected ElasticSerializableTVList tvList;


  public DataPointIterator(List<ManagedSeriesReader> readers) {
    // 不带值过滤...  SeriesRowDataBatchReader 
  }
  
  public DataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }


DataPointBatchIterator

public abstract class DataPointBatchIterator 
  implements IBatchIterator { // IBatchIterator实现为按需获取


  protected ElasticSerializableTVList tvList;
    
  public DataPointBatchIterator(List<ManagedSeriesReader> readers) {
    // 不带值过滤... 
  }
  
  public DataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }
}


class SizeLimitedDataPointBatchIterator 
  extends DataPointBatchIterator {
  
  public SizeLimitedDataPointBatchIterator(List<ManagedSeriesReader> readers, int batchSize) {
    // 不带值过滤... 
  }
  
  public SizeLimitedDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, int batchSize) {
    // 带值过滤...
  }
  
  // ...
} 


class TimeWindowDataPointBatchIterator 
  extends DataPointBatchIterator {
  
  public TimeWindowDataPointBatchIterator(List<ManagedSeriesReader> readers, long timeWindowInNanoSeconds) {
    // 不带值过滤... 
  }
  
  public TimeWindowDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, long timeWindowInNanoSeconds) {
    // 带值过滤...
  }
  
  // ...


OverallDataPointIterator

public class OverallDataPointIterator 
  implements IOverallDataPointIterator {


  protected ElasticSerializableTVList tvList;
  // ...
  
  public OverallDataPointIterator(List<ManagedSeriesReader> readers) 
  {
    // 不带值过滤...  SeriesRowDataBatchReader 
  }
  
  public OverallDataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }
} 


数据行

RowRecordIterator

与DataPointIterator类似,提供3种类型的Iterator。

RowRecordIterator:迭代

RowRecordBatchIterator:迭代、随机

OverallRowRecordIterator:迭代、随机

构造方式类同,实现时会增加一个时间的最小堆用于归并排序。


...


实现方式

不改变IoTDB整体的查询框架,查询方式按照生成QueryPlan -> QueryExecutor -> QueryDataSet进行。


UDTF

按照构造UDTFQueryPlan -> 构造UDTFQueryExecutor -> 构造UDTFQueryDataSet的顺序进行查询。


UDTFQueryPlan通过extends RawQueryPlan进行改造,拓展解析出若干UDF的属性与方法。在此阶段构造UDTF实例,调用UDTF实例的初始化方法(解析并初始化设置等)。


UDTFQueryExecutor通过extends RawQueryExecutor进行改造,可以复用大部分逻辑(生成reader和TimeGenerator等的逻辑)。


UDTFQueryDataSet需要包含三种类型的实现:AlignByTime / AlignByDevice / NonAlign


对于RawQuery,其AlignByTime的QueryDataSet按照带值过滤与否被实现为了RawQueryDataSetWithoutValueFilter和RawQueryDataSetWithValueFilter;

NonAlign的QueryDataSet在NonAlignEngineDataSet中实现(仅包含不带值过滤的逻辑);

AlignByDevice的QueryDataSet在AlignByDeviceDataSet中借助RawQueryDataSet生成。


对于UDTF,AlignByDevice的数据生成比较简单,可以通过简单改写AlignByDeviceDataSet实现,借助下面的UDTFQueryDataSet即可直接生成。


对于UDTFQueryDataSet的实现(包括AlignByTime / NonAlign的情况),一种自然的想法是通过extends RawQueryDataSet/NonAlignEngineDataSet或者直接在RawQueryDataSet/NonAlignEngineDataSet上改写的方式进行,以此复用部分原始数据获取(生产)和结果数据填充(消费)的逻辑。但这有两个问题:

  1. 在AlignByTime的实现中,UDTF虽然获取原始数据的方式是不同的,但是这些数据面向UDF编程接口时(进行transform时)没有不同,在UDF编程接口中只能看到点、批、全量的迭代器。通过extends RawQueryDataSet或者直接在RawQueryDataSet上改写的方式进行实现,会导致出现两套一模一样的transform逻辑。而且,RawQueryDataSetWithoutValueFilter 和 RawQueryDataSetWithValueFilter 的数据生产逻辑难以直接被UDF直接利用,需要较大的改动;数据结果填充逻辑更是完全无法利用。
  2. UDF生成NonAlignDataSet时无法extends RawQuery的NonAlignDataSet,同时UDF在数据完成transfrom前和AlignByTime的处理逻辑没有任何的不同,仅在结果集填充(fillBuffer)上有区别。


针对问题1,UDTFQueryDataSet实现时可以不以类区分带值过滤和不带值过滤的情况,实现一个UDTFQueryDataSet提供两个构造函数,分别处理带值过滤和不带值过滤的原始数据生成,然后走同一套transform流程;

针对问题2,可以通过extends UDTFQueryDataSet实现UDTFNonAlignDataSet和UDTFAlignByTimeDataSet,在两个类中分别实现一个fillBuffer用于打包NonAlign和AlignByTime的数据。


实现方式可通过改造RawQueryDataSet的方式进行,使得UDTFQueryDataSet和RawQueryDataSet可以复用相同的逻辑进行生成。下列陈述描述了UDTFQueryDataSet的必要构造过程。


最终的类关系如下:


UDAF

按照构造UDAFQueryPlan -> 构造UDAFQueryExecutor -> 构造UDAFQueryDataSet的顺序进行查询。


把StatisticsUDAF视为Aggregation的一种特例,UDAFQueryPlan的实现可通过直接在AggregationPlan中增加若干属性进行。在UDAFQueryPlan构造阶段构造UDAF实例,调用UDAF实例的初始化方法(解析并初始化设置等)。


无需特别构造UDAFQueryExecutor/UDAFQueryDataSet,利用AggregationExecutor/AggregationDataSet相同的逻辑与步骤,即可。

  • No labels