UDF 框架设计
(Author: Steve Yurong Su)
(Origin link: https://shimo.im/docs/vqhkTcDH6QXvQkyw)
https://docs.influxdata.com/kapacitor/v1.5/nodes/u_d_f_node/
https://docs.timescale.com/latest/using-timescaledb/reading-data#advanced-analytics
https://docs.fauna.com/fauna/current/api/graphql/functions
https://www.dolphindb.com/help/
https://www.cnblogs.com/raker/p/4377343.html
https://blog.csdn.net/shaoyiwenet/article/details/53256103
https://blog.csdn.net/weixin_42181917/article/details/82865140
https://www.cnblogs.com/hd-zg/p/5947468.html
https://blog.csdn.net/ZZQHELLO2018/article/details/105700649
https://blog.csdn.net/wangpei1949/article/details/103444412
一个UDF查询的情景可能包括单序列查询、多序列查询。
select udf(s1) from root.sg1.d1; select udf(s1), s2 from root.sg1.d1; select udf(s1), udf(s2) from root.sg1.d1;
select udf(s1, s2, s3) from root.sg1.d1; select udf(s1, s2, "string", "number"), udf1(s1), s3 from root.sg1.d1;
在多序列查询的场景下,提供用户如下迭代方式进行transfrom:
以上场景都需要考虑内存受限的情况。
CREATE [temporary] FUNCTION function_name AS class_name;
DROP [temporary] FUNCTION function_name;
SHOW [temporary] FUNCTIONS;
UDAF 多对一
UDTF 多对多
/** * select语句中,udf接收的参数 * 接收带单引号或双引号 STRING 类型的参数或者suffix path类型的参数 * 参数可变长,允许有多个measurements * * 例: * select * udf1(measurement0, measurement1, parameter0, parameter1), * udf2(measurement2, parameter2), * measurement3, * measurement4 * from root.sg1. ..; * * 对udf1,measurement0, measurement1, parameter0, parameter1就是 * parameters */ public class UDFParameters { int length(); boolean getBoolean(int index); int getInteger(int index); long getLong(int index); float getFloat(int index); double getDouble(int index); String getString(int index); Path getPath(int index);
最后可能区分UDAF UDTF的配置接口,以提供不同的配置选项。
/** * 给用户个性化配置UDF的接口 * UDFConfigurations = new UDFConfigurations(); * configurations * .setMeasurementIndexes(Arrays.asList(0, 1, 2)) * * .setDataPointAccessStrategies( * 2, FETCH_BY_TIME_WINDOW)) * .setBatchTimeWindow(2, 100_000) * * .mergeOriginalSeries("unique-name", Arrays.asList(0, 1)) * .setRowRecordAccessStrategies( * "unique-name", FETCH_BY_SIZE_LIMITED_WINDOW) * .setBatchSizeLimit("unique-name", 1000) * * .setOutputDataType(TSDataType.FLOAT) * .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")") * .setTimeout(10000); * * 用户自定义配置完成后会有验证、检查过程 */ public class UDFConfigurations { enum DataPointAccessStrategy { FETCH_BY_POINT, // 提供DataPointIterator FETCH_BY_TIME_WINDOW, // 提供DataPointBatchIterator FETCH_BY_SIZE_LIMITED_WINDOW, // 提供DataPointBatchIterator RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallDataPointIterator } enum RowRecordAccessStrategy { FETCH_BY_ROW, // 提供RowRecordIterator FETCH_BY_TIME_WINDOW, // 提供RowRecordBatchIterator FETCH_BY_SIZE_LIMITED_WINDOW, // 提供RowRecordBatchIterator RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallRowRecordIterator } // 说明 measurement 在参数中的位置,以0为起点 public UDFConfigurations setMeasurementIndexes(int[] indexes); // 说明各个序列的读取策略,以此参数提供不同的iterators供用户使用 public UDFConfigurations setDataPointAccessStrategies( int measurementIndex, DataPointAccessStrategy pointAccessStrategies); public UDFConfigurations setBatchSizeLimit( int measurementIndex, int sizeLimit); public UDFConfigurations setBatchTimeWindow( int measurementIndex, long timeWindow); public UDFConfigurations setBatchTimeWindow( int measurementIndex, long timeWindow, long startTime); // startTime 提供 MIN_TIME_IN_SERIES public UDFConfigurations mergeOriginalSeries( String mergedSeriesName, int[] indexes); public UDFConfigurations setBatchSizeLimit( String mergedSeriesName, int sizeLimit); public UDFConfigurations setBatchTimeWindow( String mergedSeriesName, long timeWindow); public UDFConfigurations setBatchTimeWindow( String mergedSeriesName, long timeWindow, long startTime); // startTime 提供 MIN_TIME_IN_SERIES // transform 后结果的类型 public UDFConfigurations setOutputDataType(TSDataType dataType); // 展现在客户端的头信息 public UDFConfigurations setColumnHeader(String header); // 执行时间限制 public UDFConfigurations setTimeout(long timeout); // 更多setter以及getter ... }
public abstract class UDF { protected List<Path> paths; protected List<TSDataType> originalSeriesDataTypes; // iterators,用户从此拿数据 // OverallDataPointIterator / 需要获取全量的查询数据才能进一步处理的情境 // DataPointBatchIterator / 可以获取一定量数据后按批处理的情境 // DataPointIterator / 可以逐数据点处理的情 // 针对序列可以分开处理的情形 protected List<DataPointIterator> DataPointIterators; // 多序列归并后,按时间对齐的结果,需要在UDFConfigurations.mergeOriginalSeries protected HashMap<String, RowRocordIterator> RowRecordIterators; // 用于 show 管理语句信息的加载 public abstract String name(); public abstract String description(); // initialize 调用后,实例内所有的属性才可用 // 用于初始化,加载资源等 public abstract void initialize( UDFParameters parameters, UDFConfigurations configurations ); // 释放资源等 public abstract void terminate(); // 保留给系统进行设置 public void setPaths(List<Path> paths); public void setOriginalSeriesDataTypes(List<TSDataType> dataTypes); public void setQueryDataIterators(List<QueryDataIterator> QueryDataIterators); // ... }
public abstract class UDTF extends UDF { protected ElasticSerializableTVList collector; // 数据收集器,获取transform后的序列,ElasticSerializableTVList是含内存管理的TVList protected int rowLimit; protected int rowOffset; // 用户需要实现 public abstract boolean hasRemainingDataToTransform(); public abstract void transform(int fetchSize); // 用户自行保存上下文 // 保留给系统进行设置 public final void setCollector(ElasticSerializableTVList collector) { ... } public final void setRowLimit(int rowLimit) { ... } public final void setRowOffset(int rowOffset) { ... }
public abstract class UDAF extends UDF { // 用户需要实现 public abstract Object aggregate(); }
直接利用Statistics数据情境
// QueryDataIterator属性无效 public abstract class UDAF extends UDF { // 实现用于构造 AggregateResult 的接口... public abstract void updateResultFromPageData(BatchData dataInThisPage); public abstract void updateResultUsingTimestamps(long[] timestamps, int length, IReaderByTimestamp dataReader); // ... }
public class HiFiUDTF extends PointDataUDTF { private int hifiSampleSize; private String hifiWeightOperator; private String hifiSampleOperator; private int totalReturnedRowsNumber; public String name() { return "hifi"; } public String description() { return "This is a HiFi query example."; } public void initialize( UDFParameters parameters, UDFConfigurations configurations ) { totalReturnedRowsNumber = 0; hifiSampleSize = parameters.getInteger(1); hifiWeightOperator = parameters.getString(2); hifiSampleOperator = parameters.getString(3); configurations .setMeasurementIndexes(Arrays.asList(0)) .setDataPointAccessStrategies(0,FETCH_BY_POINT) .setOutputDataType(TSDataType.FLOAT) .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")") .setTimeout(10000); } public abstract void terminate() { // ignored } public boolean hasRemainingDataToTransform() { return totalReturnedRowsNumber < rowLimit && (IDataPointIterator iterators.get(0)).hasNext(); } public void transform() { int alreadyReturnedRowsNumber = 0; IDataPointIterator iterator = (IDataPointIterator) iterators.get(0); while (alreadyReturnedRowsNumber < fetchSize && totalReturnedRowsNumber < rowLimit && iterator.hasNext()) { iterator.next(); if (iterator.currentIndex() < rowOffset) { continue; } collector.putFloat( iterator.currentTime(), doSomething(iterator.currentFloat()) ); ++totalReturnedRowsNumber; } }
工具类,提供临时查询文件管理、读写等的静态方法。
主要功能描述:
- time offset time offset time offset
- capacity()
- length + data
- data/query/{query-id}/{path-name / merged-series-name}/{SerializableTVList-index}.temp。
interface SerializableList { ByteBuffer serialize(); void deserialize(ByteBuffer buffer); int capacity(); boolean isFull(); void release(); // 清空内存 // ... }
public class SerializableLongTVList extends LongTVList implements SerializableList { public static int getInternalTVListSizeLimit(int memoryLimitInMB) { // ... } public SerializableLongTVList(int internalTVListSizeLimit) { super(); // ... } }
public class ElasticSerializableTVList { protected int size; protected List<SerializableTVList> tvLists; protected LRULinkedHashMap cache; public ElasticSerializableTVList( TSDataType dataType, int memoryLimitInMB, int cacheSize) { // ... } public long getTime(int index); public long getLong(int index); public void putLong(long timestamp, long value); // ... public IDataPointIterator getDataPointIterator(); public IDataPointBatchIterator getSizeLimitedDataPointBatchIterator(int batchSize); public IDataPointBatchIterator getTimeWindowDataPointBatchIterator(long timeInterval); }
public class SerializableRowRecordList implements SerializableList { protected List<RowRecord> rowRecordList; public static int getInternalRowRecordListLimit(int memoryLimitInMB) { // ... } public SerializableRowRecordList(int internalRowRecordListSizeLimit) { // ... }
public class ElasticSerializableRowRecordList { protected int size; protected List<SerializableRowRecordList> rowRecordLists; protected LRULinkedHashMap cache; public ElasticSerializableRowRecordList( List<TSDataType> dataTypes, int memoryLimitInMB, int cacheSize) { // ... } public long getTime(int index); public RowRecord getRowRecord(int index); public void putRowRecord(RowRecord row); // ... public IRowRecordIterator getRowRecordIterator(); public IRowRecordBatchIterator getSizeLimitedRowRecordBatchIterator(int batchSize); public IDataPointBatchIterator getTimeWindowRowRecordBatchIterator(long timeInterval); }
提供顺序访问与随机访问的能力。
interface QueryDataIterator {}
public interface IPointReader { boolean hasNextTimeValuePair() throws IOException; TimeValuePair nextTimeValuePair() throws IOException; TimeValuePair currentTimeValuePair() throws IOException; void close() throws IOException
不使用IPointReader是IPointReader要求包装TimeValuePair,TimeValuePair又会要求
interface IDataPointIterator extends QueryDataIterator { // 比IPointReader少一层包装 // 不使用TimeValuePair boolean hasNextPoint(); void next(); int currentPointIndex(); long currentTime(); long currentLong(); int currentInt(); // ... }
interface IDataPointBatchIterator extends QueryDataIterator { boolean hasNextBatch(); void next(); // 顺序访问能力 int currentBatchIndex(); IDataPointIterator currentBatch(); // 随机访问能力 int currentBatchSize(); long getTimeInCurrentBatch(int index); int getIntInCurrentBatch(int index); long getLongInCurrentBatch(int index); bool getBooleanInCurrentBatch(int index); float getFloatInCurrentBatch(int index); double getDoubleInCurrentBatch(int index); }
interface IRowRecordIterator extends QueryDataIterator { boolean hasNextRowRecord(); void next(); int currentRowRecordIndex(); RowRecord currentRowRecord(); long currentTime(); }
interface IRowRecordBatchIterator extends QueryDataIterator { boolean hasNextBatch(); void next(); // 顺序访问能力 int currentBatchIndex(); IRowRecordIterator currentBatch(); // 随机访问能力 int currentBatchSize(); long getTimeInCurrentBatch(int index); RowRecord getRowRecordInCurrentBatch(int index); }
interface IOverallDataPointIterator extends QueryDataIterator { public IDataPointIterator getPointDataIterator(); public IDataPointBatchIterator getSizeLimitedBatchDataIterator(int batchSize); public IDataPointBatchIterator getTimeWindowBatchDataIterator(long timeInterval) int size(); long getTime(int index); int getInt(int index); long getLong(int index); bool getBoolean(int index); float getFloat(int index); double getDouble(int index); }
interface IOverallRowRecordIterator extends QueryDataIterator { public IRowRecordIterator getRowRecordIterator(); public IRowRecordBatchIterator getSizeLimitedBatchDataIterator(int batchSize); public IRowRecordBatchIterator getTimeWindowBatchDataIterator(long timeInterval) int size(); long getTime(int index); RowRecord getRowRecord(int index); }
public class DataPointIterator implements IPointIterator { // IPointIterator实现为按需获取 protected ElasticSerializableTVList tvList; public DataPointIterator(List<ManagedSeriesReader> readers) { // 不带值过滤... SeriesRowDataBatchReader } public DataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) { // 带值过滤... }
public abstract class DataPointBatchIterator implements IBatchIterator { // IBatchIterator实现为按需获取 protected ElasticSerializableTVList tvList; public DataPointBatchIterator(List<ManagedSeriesReader> readers) { // 不带值过滤... } public DataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) { // 带值过滤... } } class SizeLimitedDataPointBatchIterator extends DataPointBatchIterator { public SizeLimitedDataPointBatchIterator(List<ManagedSeriesReader> readers, int batchSize) { // 不带值过滤... } public SizeLimitedDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, int batchSize) { // 带值过滤... } // ... } class TimeWindowDataPointBatchIterator extends DataPointBatchIterator { public TimeWindowDataPointBatchIterator(List<ManagedSeriesReader> readers, long timeWindowInNanoSeconds) { // 不带值过滤... } public TimeWindowDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, long timeWindowInNanoSeconds) { // 带值过滤... } // ...
public class OverallDataPointIterator implements IOverallDataPointIterator { protected ElasticSerializableTVList tvList; // ... public OverallDataPointIterator(List<ManagedSeriesReader> readers) { // 不带值过滤... SeriesRowDataBatchReader } public OverallDataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) { // 带值过滤... } }
不改变IoTDB整体的查询框架,
UDTFQueryPlan
UDTFQueryExecutor通过
UDTFQueryDataSet
对于RawQuery,其AlignByTime的QueryDataSet按照带值过滤与否被实现为了RawQueryDataSetWithoutValueFilter和RawQueryDataSetWithValueFilter;
NonAlign的QueryDataSet在
UDTFQueryDataSet
UDAFQueryPlan UDAFQueryPlan构造
UDAFQueryExecutor/UDAFQueryDataSet