You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

UDF 框架设计

(Author: Steve Yurong Su)

(Origin link: https://shimo.im/docs/vqhkTcDH6QXvQkyw)

UDF调研

时序数据库UDF

InfluxDB (自定Script,与kapacitor交互): https://docs.influxdata.com/kapacitor/v1.5/nodes/u_d_f_node/

TimescaleDB (类似Postgre): https://docs.timescale.com/latest/using-timescaledb/reading-data#advanced-analytics

FaunaDB (GraphQL): https://docs.fauna.com/fauna/current/api/graphql/functions

DolphinDB: https://www.dolphindb.com/help/


其他框架UDF

MySQL:

https://www.cnblogs.com/raker/p/4377343.html

https://blog.csdn.net/shaoyiwenet/article/details/53256103

Hive (HQL,定义方式,加载方式jar包动态加载):

https://blog.csdn.net/weixin_42181917/article/details/82865140

https://www.cnblogs.com/hd-zg/p/5947468.html

https://blog.csdn.net/ZZQHELLO2018/article/details/105700649

Flink:

https://blog.csdn.net/wangpei1949/article/details/103444412


几种查询UDF的使用情境

  1. 需要获取全量的查询数据才能进一步处理的情境(需要向用户提供随机、顺序访问序列的能力)
  2. 可以获取一定量数据后按批处理的情境(需要向用户提供随机、顺序访问序列的能力)
  3. 可以逐数据点(行)处理的情境 (需要向用户提供顺序访问序列的能力)


一个UDF查询的情景可能包括单序列查询、多序列查询。

单序列:

select udf(s1) from root.sg1.d1;
select udf(s1), s2 from root.sg1.d1;
select udf(s1), udf(s2) from root.sg1.d1; 

多序列:

select udf(s1, s2, s3) from root.sg1.d1; 
select udf(s1, s2, "string", "number"), udf1(s1), s3 from root.sg1.d1; 


在多序列查询的场景下,提供用户如下迭代方式进行transfrom:

分别按点迭代每条序列

直接迭代多条序列按时间对齐后的行


以上场景都需要考虑内存受限的情况。


UDF管理SQL语法

装载

CREATE [temporary] FUNCTION function_name AS class_name;


卸载

DROP [temporary] FUNCTION function_name;


展示

SHOW [temporary] FUNCTIONS;


编程接口设计

区分

用户定义聚合函数UDAF(源数据-生成数据 多对一

用户定义表格函数UDTF(源数据-生成数据 多对多


UDF传入参数的获取接口

/**
 * select语句中,udf接收的参数
 * 接收带单引号或双引号 STRING 类型的参数或者suffix path类型的参数
 * 参数可变长,允许有多个measurements
 * 
 * 例:
 * select 
 *   udf1(measurement0, measurement1, parameter0, parameter1), 
 *   udf2(measurement2, parameter2),
 *   measurement3, 
 *   measurement4
 * from root.sg1. ..;
 * 
 * 对udf1,measurement0, measurement1, parameter0, parameter1就是
 * parameters
 */
public class UDFParameters {


  int length();


  boolean getBoolean(int index);
  int getInteger(int index);
  long getLong(int index);
  float getFloat(int index);
  double getDouble(int index);
  String getString(int index);
  Path getPath(int index);


UDF用户个性化配置接口

最后可能区分UDAF UDTF的配置接口,以提供不同的配置选项。

/**
 * 给用户个性化配置UDF的接口
 * UDFConfigurations = new UDFConfigurations();
 * configurations
 *   .setMeasurementIndexes(Arrays.asList(0, 1, 2))
 *
 *   .setDataPointAccessStrategies(
 *      2, FETCH_BY_TIME_WINDOW))
 *   .setBatchTimeWindow(2, 100_000)
 *
 *   .mergeOriginalSeries("unique-name", Arrays.asList(0, 1))
 *   .setRowRecordAccessStrategies(
 *      "unique-name", FETCH_BY_SIZE_LIMITED_WINDOW)
 *   .setBatchSizeLimit("unique-name", 1000)
 *
 *   .setOutputDataType(TSDataType.FLOAT)
 *   .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
 *   .setTimeout(10000);
 *
 * 用户自定义配置完成后会有验证、检查过程
 */
public class UDFConfigurations {


  enum DataPointAccessStrategy {
    FETCH_BY_POINT, // 提供DataPointIterator
    FETCH_BY_TIME_WINDOW, // 提供DataPointBatchIterator
    FETCH_BY_SIZE_LIMITED_WINDOW, // 提供DataPointBatchIterator
    RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallDataPointIterator
  }
  
  enum RowRecordAccessStrategy {
    FETCH_BY_ROW, // 提供RowRecordIterator
    FETCH_BY_TIME_WINDOW, // 提供RowRecordBatchIterator
    FETCH_BY_SIZE_LIMITED_WINDOW, // 提供RowRecordBatchIterator
    RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallRowRecordIterator
  }
  
  // 说明 measurement 在参数中的位置,以0为起点
  public UDFConfigurations setMeasurementIndexes(int[] indexes);
  
  // 说明各个序列的读取策略,以此参数提供不同的iterators供用户使用
  public UDFConfigurations setDataPointAccessStrategies(
    int measurementIndex, 
    DataPointAccessStrategy pointAccessStrategies);  
  public UDFConfigurations setBatchSizeLimit(
    int measurementIndex, int sizeLimit);
  public UDFConfigurations setBatchTimeWindow(
    int measurementIndex, long timeWindow);
  public UDFConfigurations setBatchTimeWindow(
    int measurementIndex, long timeWindow, long startTime); 
    // startTime 提供 MIN_TIME_IN_SERIES
  
  public UDFConfigurations mergeOriginalSeries(
    String mergedSeriesName, int[] indexes);
  public UDFConfigurations setBatchSizeLimit(
    String mergedSeriesName, int sizeLimit);
  public UDFConfigurations setBatchTimeWindow(
    String mergedSeriesName, long timeWindow);
  public UDFConfigurations setBatchTimeWindow(
    String mergedSeriesName, long timeWindow, long startTime); 
    // startTime 提供 MIN_TIME_IN_SERIES
 
  // transform 后结果的类型
  public UDFConfigurations setOutputDataType(TSDataType dataType);
  
  // 展现在客户端的头信息
  public UDFConfigurations setColumnHeader(String header);
  
  // 执行时间限制
  public UDFConfigurations setTimeout(long timeout);


  // 更多setter以及getter ...
}


UDF接口

List可以变更为[]。

public abstract class UDF {


  protected List<Path> paths;
  protected List<TSDataType> originalSeriesDataTypes;
  
  // iterators,用户从此拿数据
  // OverallDataPointIterator / 需要获取全量的查询数据才能进一步处理的情境
  // DataPointBatchIterator / 可以获取一定量数据后按批处理的情境
  // DataPointIterator / 可以逐数据点处理的情
  // 针对序列可以分开处理的情形
  protected List<DataPointIterator> DataPointIterators; 
  // 多序列归并后,按时间对齐的结果,需要在UDFConfigurations.mergeOriginalSeries
  protected HashMap<String, RowRocordIterator> RowRecordIterators; 
  
  // 用于 show 管理语句信息的加载
  public abstract String name();
  public abstract String description();
  
  // initialize 调用后,实例内所有的属性才可用
  // 用于初始化,加载资源等
  public abstract void initialize(
    UDFParameters parameters,
    UDFConfigurations configurations
  );
  // 释放资源等
  public abstract void terminate();
  
  // 保留给系统进行设置
  public void setPaths(List<Path> paths);
  public void setOriginalSeriesDataTypes(List<TSDataType> dataTypes);
  public void setQueryDataIterators(List<QueryDataIterator> QueryDataIterators);
  // ...
}


UDTF接口

public abstract class UDTF extends UDF {


  protected ElasticSerializableTVList collector; // 数据收集器,获取transform后的序列,ElasticSerializableTVList是含内存管理的TVList
  
  protected int rowLimit;
  protected int rowOffset;
  
  // 用户需要实现
  public abstract boolean hasRemainingDataToTransform();
  public abstract void transform(int fetchSize); // 用户自行保存上下文


  // 保留给系统进行设置
  public final void setCollector(ElasticSerializableTVList collector) { ... }
  public final void setRowLimit(int rowLimit) { ... }
  public final void setRowOffset(int rowOffset) { ... }


UDAF接口

// 暂时不实现此类型的UDAF

public abstract class UDAF extends UDF {
  
  // 用户需要实现
  public abstract Object aggregate();
}


直接利用Statistics数据情境

// QueryDataIterator属性无效
public abstract class UDAF extends UDF {
  // 实现用于构造 AggregateResult 的接口...
  public abstract void updateResultFromPageData(BatchData dataInThisPage);
  public abstract void updateResultUsingTimestamps(long[] timestamps, int length, IReaderByTimestamp dataReader);
  // ...
}


示例

public class HiFiUDTF extends PointDataUDTF {


  private int hifiSampleSize;
  private String hifiWeightOperator;
  private String hifiSampleOperator;  
  
  private int totalReturnedRowsNumber;


  public String name() {
    return "hifi";
  }
  public String description() {
    return "This is a HiFi query example.";
  }
  
  public void initialize(
    UDFParameters parameters,
    UDFConfigurations configurations
  ) {
    totalReturnedRowsNumber = 0;
    hifiSampleSize = parameters.getInteger(1);
    hifiWeightOperator = parameters.getString(2);
    hifiSampleOperator = parameters.getString(3);
    configurations
      .setMeasurementIndexes(Arrays.asList(0))
      .setDataPointAccessStrategies(0,FETCH_BY_POINT)
      .setOutputDataType(TSDataType.FLOAT)
      .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
      .setTimeout(10000);
  }
  public abstract void terminate() {
    // ignored 
  }
  
  public boolean hasRemainingDataToTransform() {
    return totalReturnedRowsNumber < rowLimit 
      && (IDataPointIterator iterators.get(0)).hasNext();
  }
  public void transform() {
    int alreadyReturnedRowsNumber = 0;
    IDataPointIterator iterator = 
      (IDataPointIterator) iterators.get(0);
    while (alreadyReturnedRowsNumber < fetchSize
      && totalReturnedRowsNumber < rowLimit 
      && iterator.hasNext()) {
      iterator.next();
      if (iterator.currentIndex() < rowOffset) {
        continue;
      }
      collector.putFloat(
        iterator.currentTime(), 
        doSomething(iterator.currentFloat())
      );
      ++totalReturnedRowsNumber;
    }
  }


临时查询文件管理器

工具类,提供临时查询文件管理、读写等的静态方法。

主要功能描述:

  1. SerializableTVList和SerializableRowRecordList的序列化与反序列化

文件格式:

    1. 目标:根据index可以快速定位元组的offset,可同时面向定长和不定长数据进行序列化
    2. 定长数据:
      1. FLOAT / DOUBLE / INTEGER / LONG / BOOLEAN
      2. 格式: time value time value time value ...
    3. 不定长数据:
      1. 格式:time offset time offset time offset ...
      2. 由于单个文件可存储的time value对是固定的,由capacity()决定。因此第capacity()个time offset位置后可存储不定长数据。
      3. 设置一个阈值s,在按批次从磁盘反序列化时,若某个(行)数据长度低于s,则在该批次反序列化时和其他数据一起反序列化;反之,在访问时才反序列化。
      4. STRING存储格式:length + data
      5. RowRecord存储格式: length + (BOOLEAN data) + (BOOLEAN data) + ... BOOLEAN记录是否为null
  1. 管理文件
    1. 可能可以使用到TSFileFactory?
    2. 生成文件路径
      1. 例如:data/query/{query-id}/{path-name / merged-series-name}/{SerializableTVList-index}.temp。index指的是SerializableTVList在ElasticSerializableTVList中的index。
    3. 删除临时文件
    4. ...


核心数据结构

TODO:确定BatchData和TVList关系。

interface SerializableList {


  ByteBuffer serialize(); 
  void deserialize(ByteBuffer buffer); 
 
  int capacity();
  boolean isFull();
  
  void release(); // 清空内存
  // ...
}


SerializableTVList

核心功能描述:

  1. SerializableTVList接收一个int型构造参数,该参数为values和timestamps数组的size()限制(values和timestamps的长度最多为该参数)。SerializableTVList通过控制该int型参数控制一个SerializableTVList的内存占用。提供capacity()和isFull()等方法。
  2. static方法:针对不同序列类型,给定内存限制,返回一个TVList中values和timestamps的size限制。
  3. 基于TVList拓展,增加序列化与反序列化能力。可以将values和timestamps整体序列化进ByteBuffer,并从ByteBuffer整体反序列化。序列化后,调用release()清空内存占用。
  4. 由外部(ElasticSerializableTVList)控制序列化与反序列化的过程。
public class SerializableLongTVList 
  extends LongTVList   
  implements SerializableList {
  
  public static int getInternalTVListSizeLimit(int memoryLimitInMB) {
    // ...
  }
  
  public SerializableLongTVList(int internalTVListSizeLimit) {
    super();
    // ...
  } 
}


ElasticSerializableTVList

核心需求分析:

  1. 顺序读取、写入能力
  2. 随机读取能力
  3. 读写不存在同时进行的情况


核心功能描述:

  1. 核心数据结构为List<SerializableTVList>,以一个SerializableTVList为基本的管理单元,通过类似TVList的计算方法增加和定位TV元素
  2. 辅助数据结构LRU Cache,用于管理缓存SerializableTVList。缓存替换时对整个SerializableTVList进行替换。
  3. 可以限定一个ElasticSerializableTVList的总容量和缓存个数。可以根据总容量和缓存个数决定一个SerializableTVList的容量。总允许容量低于某个阈值,缓存个数定为1.
  4. 对外提供put()接口,get(int index)接口,TV的迭代器,LRU策略进行缓存替换


优点:

  1. 以SerializableTVList为一块内存管理单元,容易实现
  2. 具备TVList的优点
  3. 缓存方便直接


public class ElasticSerializableTVList {
  
  protected int size;
  protected List<SerializableTVList> tvLists;
  protected LRULinkedHashMap cache;
  
  public ElasticSerializableTVList(
    TSDataType dataType, 
    int memoryLimitInMB, 
    int cacheSize) { // ... }
    
  public long getTime(int index);
  public long getLong(int index);
  public void putLong(long timestamp, long value);
  // ...
  
  public IDataPointIterator getDataPointIterator();
  public IDataPointBatchIterator 
    getSizeLimitedDataPointBatchIterator(int batchSize);
  public IDataPointBatchIterator 
    getTimeWindowDataPointBatchIterator(long timeInterval);
}


SerializableRowRecordList

类似SerializableTVList.

public class SerializableRowRecordList 
  implements SerializableList {


  protected List<RowRecord> rowRecordList;


  public static int getInternalRowRecordListLimit(int memoryLimitInMB) 
  {
    // ...
  }
  
  public SerializableRowRecordList(int internalRowRecordListSizeLimit) 
  {
    // ...
  }


ElasticSerializableRowRecordList

最终实现类似ElasticSerializableTVList的功能。

public class ElasticSerializableRowRecordList {
  
  protected int size;
  protected List<SerializableRowRecordList> rowRecordLists;
  protected LRULinkedHashMap cache;
  
  public ElasticSerializableRowRecordList(
    List<TSDataType> dataTypes, 
    int memoryLimitInMB, 
    int cacheSize) { // ... 
    }
    
  public long getTime(int index);
  public RowRecord getRowRecord(int index);
  public void putRowRecord(RowRecord row);
  // ...
  
  public IRowRecordIterator getRowRecordIterator();
  public IRowRecordBatchIterator 
    getSizeLimitedRowRecordBatchIterator(int batchSize);
  public IDataPointBatchIterator 
    getTimeWindowRowRecordBatchIterator(long timeInterval);
}


迭代接口

QueryDataIterator提供接口帮助用户控制数据点、数据批或者整个序列数据的获取,它提供顺序访问与随机访问的能力。

interface QueryDataIterator {}


数据点的迭代

现有实现中有功能类似的接口

public interface IPointReader {


  boolean hasNextTimeValuePair() throws IOException;


  TimeValuePair nextTimeValuePair() throws IOException;


  TimeValuePair currentTimeValuePair() throws IOException;


  void close() throws IOException

不使用IPointReader一方面出于效率考虑。IPointReader要求包装TimeValuePair,TimeValuePair又会要求TsPrimitiveType。IPointReader直接把time和value分离,直接暴露给用户 ,不必对数据进行包装。

另一方面,IDataPointIterator提供了currentPointIndex的接口用于定位数据,而IPointReader没有。


IDataPointIterator

interface IDataPointIterator extends QueryDataIterator {
  
  // 比IPointReader少一层包装
  // 不使用TimeValuePair


  boolean hasNextPoint();
  void next();
  
  int currentPointIndex();
  long currentTime();
  long currentLong();
  int currentInt();
  // ...
} 


IDataPointBatchIterator

interface IDataPointBatchIterator extends QueryDataIterator {
  
  boolean hasNextBatch();
  void next();
  
  // 顺序访问能力
  int currentBatchIndex();
  IDataPointIterator currentBatch();
  
  // 随机访问能力
  int currentBatchSize();
  long getTimeInCurrentBatch(int index);
  int getIntInCurrentBatch(int index);
  long getLongInCurrentBatch(int index);
  bool getBooleanInCurrentBatch(int index);
  float getFloatInCurrentBatch(int index);
  double getDoubleInCurrentBatch(int index);
}


数据行的迭代

IRowRecordIterator

interface IRowRecordIterator extends QueryDataIterator {


  boolean hasNextRowRecord();
  void next();
  
  int currentRowRecordIndex();
  RowRecord currentRowRecord();
  long currentTime();
} 


IRowRecordBatchIterator

interface IRowRecordBatchIterator extends QueryDataIterator {
  
  boolean hasNextBatch();
  void next();
  
  // 顺序访问能力
  int currentBatchIndex();
  IRowRecordIterator currentBatch();
  
  // 随机访问能力
  int currentBatchSize();
  long getTimeInCurrentBatch(int index);
  RowRecord getRowRecordInCurrentBatch(int index);
}


全体数据的迭代

IOverallDataPointIterator

interface IOverallDataPointIterator extends QueryDataIterator {


  public IDataPointIterator getPointDataIterator();
  public IDataPointBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
  public IDataPointBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
  
  int size();
  long getTime(int index);
  int getInt(int index);
  long getLong(int index);
  bool getBoolean(int index);
  float getFloat(int index);
  double getDouble(int index);
}


IOverallRowRecordIterator

interface IOverallRowRecordIterator extends QueryDataIterator {


  public IRowRecordIterator getRowRecordIterator();
  public IRowRecordBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
  public IRowRecordBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
  
  int size();
  long getTime(int index);
  RowRecord getRowRecord(int index);
}


数据迭代实现QueryDataIterator

利用核心数据结构实现迭代接口。

数据点

DataPointIterator

public class DataPointIterator 
  implements IPointIterator { // IPointIterator实现为按需获取


  protected ElasticSerializableTVList tvList;


  public DataPointIterator(List<ManagedSeriesReader> readers) {
    // 不带值过滤...  SeriesRowDataBatchReader 
  }
  
  public DataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }


DataPointBatchIterator

public abstract class DataPointBatchIterator 
  implements IBatchIterator { // IBatchIterator实现为按需获取


  protected ElasticSerializableTVList tvList;
    
  public DataPointBatchIterator(List<ManagedSeriesReader> readers) {
    // 不带值过滤... 
  }
  
  public DataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }
}


class SizeLimitedDataPointBatchIterator 
  extends DataPointBatchIterator {
  
  public SizeLimitedDataPointBatchIterator(List<ManagedSeriesReader> readers, int batchSize) {
    // 不带值过滤... 
  }
  
  public SizeLimitedDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, int batchSize) {
    // 带值过滤...
  }
  
  // ...
} 


class TimeWindowDataPointBatchIterator 
  extends DataPointBatchIterator {
  
  public TimeWindowDataPointBatchIterator(List<ManagedSeriesReader> readers, long timeWindowInNanoSeconds) {
    // 不带值过滤... 
  }
  
  public TimeWindowDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, long timeWindowInNanoSeconds) {
    // 带值过滤...
  }
  
  // ...


OverallDataPointIterator

public class OverallDataPointIterator 
  implements IOverallDataPointIterator {


  protected ElasticSerializableTVList tvList;
  // ...
  
  public OverallDataPointIterator(List<ManagedSeriesReader> readers) 
  {
    // 不带值过滤...  SeriesRowDataBatchReader 
  }
  
  public OverallDataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
    // 带值过滤...
  }
} 


数据行

RowRecordIterator

与DataPointIterator类似,提供3种类型的Iterator。

RowRecordIterator:迭代

RowRecordBatchIterator:迭代、随机

OverallRowRecordIterator:迭代、随机

构造方式类同,实现时会增加一个时间的最小堆用于归并排序。


...


实现方式

不改变IoTDB整体的查询框架,查询方式按照生成QueryPlan -> QueryExecutor -> QueryDataSet进行。


UDTF

按照构造UDTFQueryPlan -> 构造UDTFQueryExecutor -> 构造UDTFQueryDataSet的顺序进行查询。


UDTFQueryPlan通过extends RawQueryPlan进行改造,拓展解析出若干UDF的属性与方法。在此阶段构造UDTF实例,调用UDTF实例的初始化方法(解析并初始化设置等)。


UDTFQueryExecutor通过extends RawQueryExecutor进行改造,可以复用大部分逻辑(生成reader和TimeGenerator等的逻辑)。


UDTFQueryDataSet需要包含三种类型的实现:AlignByTime / AlignByDevice / NonAlign


对于RawQuery,其AlignByTime的QueryDataSet按照带值过滤与否被实现为了RawQueryDataSetWithoutValueFilter和RawQueryDataSetWithValueFilter;

NonAlign的QueryDataSet在NonAlignEngineDataSet中实现(仅包含不带值过滤的逻辑);

AlignByDevice的QueryDataSet在AlignByDeviceDataSet中借助RawQueryDataSet生成。


对于UDTF,AlignByDevice的数据生成比较简单,可以通过简单改写AlignByDeviceDataSet实现,借助下面的UDTFQueryDataSet即可直接生成。


对于UDTFQueryDataSet的实现(包括AlignByTime / NonAlign的情况),一种自然的想法是通过extends RawQueryDataSet/NonAlignEngineDataSet或者直接在RawQueryDataSet/NonAlignEngineDataSet上改写的方式进行,以此复用部分原始数据获取(生产)和结果数据填充(消费)的逻辑。但这有两个问题:

  1. 在AlignByTime的实现中,UDTF虽然获取原始数据的方式是不同的,但是这些数据面向UDF编程接口时(进行transform时)没有不同,在UDF编程接口中只能看到点、批、全量的迭代器。通过extends RawQueryDataSet或者直接在RawQueryDataSet上改写的方式进行实现,会导致出现两套一模一样的transform逻辑。而且,RawQueryDataSetWithoutValueFilter 和 RawQueryDataSetWithValueFilter 的数据生产逻辑难以直接被UDF直接利用,需要较大的改动;数据结果填充逻辑更是完全无法利用。
  2. UDF生成NonAlignDataSet时无法extends RawQuery的NonAlignDataSet,同时UDF在数据完成transfrom前和AlignByTime的处理逻辑没有任何的不同,仅在结果集填充(fillBuffer)上有区别。


针对问题1,UDTFQueryDataSet实现时可以不以类区分带值过滤和不带值过滤的情况,实现一个UDTFQueryDataSet提供两个构造函数,分别处理带值过滤和不带值过滤的原始数据生成,然后走同一套transform流程;

针对问题2,可以通过extends UDTFQueryDataSet实现UDTFNonAlignDataSet和UDTFAlignByTimeDataSet,在两个类中分别实现一个fillBuffer用于打包NonAlign和AlignByTime的数据。


实现方式可通过改造RawQueryDataSet的方式进行,使得UDTFQueryDataSet和RawQueryDataSet可以复用相同的逻辑进行生成。下列陈述描述了UDTFQueryDataSet的必要构造过程。


最终的类关系如下:


UDAF

按照构造UDAFQueryPlan -> 构造UDAFQueryExecutor -> 构造UDAFQueryDataSet的顺序进行查询。


把StatisticsUDAF视为Aggregation的一种特例,UDAFQueryPlan的实现可通过直接在AggregationPlan中增加若干属性进行。在UDAFQueryPlan构造阶段构造UDAF实例,调用UDAF实例的初始化方法(解析并初始化设置等)。


无需特别构造UDAFQueryExecutor/UDAFQueryDataSet,利用AggregationExecutor/AggregationDataSet相同的逻辑与步骤,即可。

  • No labels