CREATE
UDF 框架设计
(Author: Steve Yurong Su)
(Original link: https://shimo.im/docs/vqhkTcDH6QXvQkyw)
https://docs.influxdata.com/kapacitor/v1.5/nodes/u_d_f_node/
https://docs.timescale.com/latest/using-timescaledb/reading-data#advanced-analytics
https://docs.fauna.com/fauna/current/api/graphql/functions
...
https://www.cnblogs.com/raker/p/4377343.html
https://blog.csdn.net/shaoyiwenet/article/details/53256103
https://blog.csdn.net/weixin_42181917/article/details/82865140
https://www.cnblogs.com/hd-zg/p/5947468.html
https://blog.csdn.net/ZZQHELLO2018/article/details/105700649
https://blog.csdn.net/wangpei1949/article/details/103444412
一个UDF查询的情景可能包括单序列查询、多序列查询。
select udf(s1) from root.sg1.d1;
select udf(s1), s2 from root.sg1.d1;
select udf(s1), udf(s2) from root.sg1.d1;
select udf(s1, s2, s3) from root.sg1.d1;
select udf(s1, s2, "string", "number"), udf1(s1), s3 from root.sg1.d1;
在多序列查询的场景下,提供用户如下迭代方式进行transfrom:
以上场景都需要考虑内存受限的情况。
CREATE [temporary] FUNCTION function_name AS class_name;
DROP [temporary] FUNCTION function_name;
SHOW [temporary] FUNCTIONS;
UDAF多对一
UDTF多对多
/**
* select语句中,udf接收的参数
* 接收带单引号或双引号 STRING 类型的参数或者suffix path类型的参数
* 参数可变长,允许有多个measurements
*
* 例:
* select
* udf1(measurement0, measurement1, parameter0, parameter1),
* udf2(measurement2, parameter2),
* measurement3,
* measurement4
* from root.sg1. ..;
*
* 对udf1,measurement0, measurement1, parameter0, parameter1就是
* parameters
*/
public class UDFParameters {
int length();
boolean getBoolean(int index);
int getInteger(int index);
long getLong(int index);
float getFloat(int index);
double getDouble(int index);
String getString(int index);
Path getPath(int index);
最后可能区分UDAF UDTF的配置接口,以提供不同的配置选项。
/**
* 给用户个性化配置UDF的接口
* UDFConfigurations = new UDFConfigurations();
* configurations
* .setMeasurementIndexes(Arrays.asList(0, 1, 2))
*
* .setDataPointAccessStrategies(
* 2, FETCH_BY_TIME_WINDOW))
* .setBatchTimeWindow(2, 100_000)
*
* .mergeOriginalSeries("unique-name", Arrays.asList(0, 1))
* .setRowRecordAccessStrategies(
* "unique-name", FETCH_BY_SIZE_LIMITED_WINDOW)
* .setBatchSizeLimit("unique-name", 1000)
*
* .setOutputDataType(TSDataType.FLOAT)
* .setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
* .setTimeout(10000);
*
* 用户自定义配置完成后会有验证、检查过程
*/
public class UDFConfigurations {
enum DataPointAccessStrategy {
FETCH_BY_POINT, // 提供DataPointIterator
FETCH_BY_TIME_WINDOW, // 提供DataPointBatchIterator
FETCH_BY_SIZE_LIMITED_WINDOW, // 提供DataPointBatchIterator
RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallDataPointIterator
}
enum RowRecordAccessStrategy {
FETCH_BY_ROW, // 提供RowRecordIterator
FETCH_BY_TIME_WINDOW, // 提供RowRecordBatchIterator
FETCH_BY_SIZE_LIMITED_WINDOW, // 提供RowRecordBatchIterator
RANDOM_ACCESS_TO_OVERALL_DATA, // 提供OverallRowRecordIterator
}
// 说明 measurement 在参数中的位置,以0为起点
public UDFConfigurations setMeasurementIndexes(int[] indexes);
// 说明各个序列的读取策略,以此参数提供不同的iterators供用户使用
public UDFConfigurations setDataPointAccessStrategies(
int measurementIndex,
DataPointAccessStrategy pointAccessStrategies);
public UDFConfigurations setBatchSizeLimit(
int measurementIndex, int sizeLimit);
public UDFConfigurations setBatchTimeWindow(
int measurementIndex, long timeWindow);
public UDFConfigurations setBatchTimeWindow(
int measurementIndex, long timeWindow, long startTime);
// startTime 提供 MIN_TIME_IN_SERIES
public UDFConfigurations mergeOriginalSeries(
String mergedSeriesName, int[] indexes);
public UDFConfigurations setBatchSizeLimit(
String mergedSeriesName, int sizeLimit);
public UDFConfigurations setBatchTimeWindow(
String mergedSeriesName, long timeWindow);
public UDFConfigurations setBatchTimeWindow(
String mergedSeriesName, long timeWindow, long startTime);
// startTime 提供 MIN_TIME_IN_SERIES
// transform 后结果的类型
public UDFConfigurations setOutputDataType(TSDataType dataType);
// 展现在客户端的头信息
public UDFConfigurations setColumnHeader(String header);
// 执行时间限制
public UDFConfigurations setTimeout(long timeout);
// 更多setter以及getter ...
}
public abstract class UDF {
protected List<Path> paths;
protected List<TSDataType> originalSeriesDataTypes;
// iterators,用户从此拿数据
// OverallDataPointIterator / 需要获取全量的查询数据才能进一步处理的情境
// DataPointBatchIterator / 可以获取一定量数据后按批处理的情境
// DataPointIterator / 可以逐数据点处理的情
// 针对序列可以分开处理的情形
protected List<DataPointIterator> DataPointIterators;
// 多序列归并后,按时间对齐的结果,需要在UDFConfigurations.mergeOriginalSeries
protected HashMap<String, RowRocordIterator> RowRecordIterators;
// 用于 show 管理语句信息的加载
public abstract String name();
public abstract String description();
// initialize 调用后,实例内所有的属性才可用
// 用于初始化,加载资源等
public abstract void initialize(
UDFParameters parameters,
UDFConfigurations configurations
);
// 释放资源等
public abstract void terminate();
// 保留给系统进行设置
public void setPaths(List<Path> paths);
public void setOriginalSeriesDataTypes(List<TSDataType> dataTypes);
public void setQueryDataIterators(List<QueryDataIterator> QueryDataIterators);
// ...
}
public abstract class UDTF extends UDF {
protected ElasticSerializableTVList collector; // 数据收集器,获取transform后的序列,ElasticSerializableTVList是含内存管理的TVList
protected int rowLimit;
protected int rowOffset;
// 用户需要实现
public abstract boolean hasRemainingDataToTransform();
public abstract void transform(int fetchSize); // 用户自行保存上下文
// 保留给系统进行设置
public final void setCollector(ElasticSerializableTVList collector) { ... }
public final void setRowLimit(int rowLimit) { ... }
public final void setRowOffset(int rowOffset) { ... }
public abstract class UDAF extends UDF {
// 用户需要实现
public abstract Object aggregate();
}
直接利用Statistics数据情境
// QueryDataIterator属性无效
public abstract class UDAF extends UDF {
// 实现用于构造 AggregateResult 的接口...
public abstract void updateResultFromPageData(BatchData dataInThisPage);
public abstract void updateResultUsingTimestamps(long[] timestamps, int length, IReaderByTimestamp dataReader);
// ...
}
public class HiFiUDTF extends PointDataUDTF {
private int hifiSampleSize;
private String hifiWeightOperator;
private String hifiSampleOperator;
private int totalReturnedRowsNumber;
public String name() {
return "hifi";
}
public String description() {
return "This is a HiFi query example.";
}
public void initialize(
UDFParameters parameters,
UDFConfigurations configurations
) {
totalReturnedRowsNumber = 0;
hifiSampleSize = parameters.getInteger(1);
hifiWeightOperator = parameters.getString(2);
hifiSampleOperator = parameters.getString(3);
configurations
.setMeasurementIndexes(Arrays.asList(0))
.setDataPointAccessStrategies(0,FETCH_BY_POINT)
.setOutputDataType(TSDataType.FLOAT)
.setColumnHeader("hifi(" + parameters.getStringValue(0) + ")")
.setTimeout(10000);
}
public abstract void terminate() {
// ignored
}
public boolean hasRemainingDataToTransform() {
return totalReturnedRowsNumber < rowLimit
&& (IDataPointIterator iterators.get(0)).hasNext();
}
public void transform() {
int alreadyReturnedRowsNumber = 0;
IDataPointIterator iterator =
(IDataPointIterator) iterators.get(0);
while (alreadyReturnedRowsNumber < fetchSize
&& totalReturnedRowsNumber < rowLimit
&& iterator.hasNext()) {
iterator.next();
if (iterator.currentIndex() < rowOffset) {
continue;
}
collector.putFloat(
iterator.currentTime(),
doSomething(iterator.currentFloat())
);
++totalReturnedRowsNumber;
}
}
工具类,提供临时查询文件管理、读写等的静态方法。
主要功能描述:
...
...
transform方法是UDTF的核心方法。它有两种签名,一种是UDTF#transform(Row, PointCollector),一种是
其一,由于允许原始数据查询与UDTF查询混合进行,所以需要修改原先逻辑计划和物理计划的生成方式。
- 所有行
- 用户可以配置。T默认为300MB,最大允许用户将T设置为读内存的20%。
见
...
- data/query/{query-id}/{path-name / merged-series-name}/{SerializableTVList-index}.temp。
interface SerializableList {
ByteBuffer serialize();
void deserialize(ByteBuffer buffer);
int capacity();
boolean isFull();
void release(); // 清空内存
// ...
}
public class SerializableLongTVList
extends LongTVList
implements SerializableList {
public static int getInternalTVListSizeLimit(int memoryLimitInMB) {
// ...
}
public SerializableLongTVList(int internalTVListSizeLimit) {
super();
// ...
}
}
public class ElasticSerializableTVList {
protected int size;
protected List<SerializableTVList> tvLists;
protected LRULinkedHashMap cache;
public ElasticSerializableTVList(
TSDataType dataType,
int memoryLimitInMB,
int cacheSize) { // ... }
public long getTime(int index);
public long getLong(int index);
public void putLong(long timestamp, long value);
// ...
public IDataPointIterator getDataPointIterator();
public IDataPointBatchIterator
getSizeLimitedDataPointBatchIterator(int batchSize);
public IDataPointBatchIterator
getTimeWindowDataPointBatchIterator(long timeInterval);
}
public class SerializableRowRecordList
implements SerializableList {
protected List<RowRecord> rowRecordList;
public static int getInternalRowRecordListLimit(int memoryLimitInMB)
{
// ...
}
public SerializableRowRecordList(int internalRowRecordListSizeLimit)
{
// ...
}
public class ElasticSerializableRowRecordList {
protected int size;
protected List<SerializableRowRecordList> rowRecordLists;
protected LRULinkedHashMap cache;
public ElasticSerializableRowRecordList(
List<TSDataType> dataTypes,
int memoryLimitInMB,
int cacheSize) { // ...
}
public long getTime(int index);
public RowRecord getRowRecord(int index);
public void putRowRecord(RowRecord row);
// ...
public IRowRecordIterator getRowRecordIterator();
public IRowRecordBatchIterator
getSizeLimitedRowRecordBatchIterator(int batchSize);
public IDataPointBatchIterator
getTimeWindowRowRecordBatchIterator(long timeInterval);
}
提供顺序访问与随机访问的能力。
interface QueryDataIterator {}
public interface IPointReader {
boolean hasNextTimeValuePair() throws IOException;
TimeValuePair nextTimeValuePair() throws IOException;
TimeValuePair currentTimeValuePair() throws IOException;
void close() throws IOException
不使用IPointReader是IPointReader要求包装TimeValuePair,TimeValuePair又会要求
interface IDataPointIterator extends QueryDataIterator {
// 比IPointReader少一层包装
// 不使用TimeValuePair
boolean hasNextPoint();
void next();
int currentPointIndex();
long currentTime();
long currentLong();
int currentInt();
// ...
}
interface IDataPointBatchIterator extends QueryDataIterator {
boolean hasNextBatch();
void next();
// 顺序访问能力
int currentBatchIndex();
IDataPointIterator currentBatch();
// 随机访问能力
int currentBatchSize();
long getTimeInCurrentBatch(int index);
int getIntInCurrentBatch(int index);
long getLongInCurrentBatch(int index);
bool getBooleanInCurrentBatch(int index);
float getFloatInCurrentBatch(int index);
double getDoubleInCurrentBatch(int index);
}
interface IRowRecordIterator extends QueryDataIterator {
boolean hasNextRowRecord();
void next();
int currentRowRecordIndex();
RowRecord currentRowRecord();
long currentTime();
}
interface IRowRecordBatchIterator extends QueryDataIterator {
boolean hasNextBatch();
void next();
// 顺序访问能力
int currentBatchIndex();
IRowRecordIterator currentBatch();
// 随机访问能力
int currentBatchSize();
long getTimeInCurrentBatch(int index);
RowRecord getRowRecordInCurrentBatch(int index);
}
interface IOverallDataPointIterator extends QueryDataIterator {
public IDataPointIterator getPointDataIterator();
public IDataPointBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
public IDataPointBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
int size();
long getTime(int index);
int getInt(int index);
long getLong(int index);
bool getBoolean(int index);
float getFloat(int index);
double getDouble(int index);
}
interface IOverallRowRecordIterator extends QueryDataIterator {
public IRowRecordIterator getRowRecordIterator();
public IRowRecordBatchIterator getSizeLimitedBatchDataIterator(int batchSize);
public IRowRecordBatchIterator getTimeWindowBatchDataIterator(long timeInterval)
int size();
long getTime(int index);
RowRecord getRowRecord(int index);
}
public class DataPointIterator
implements IPointIterator { // IPointIterator实现为按需获取
protected ElasticSerializableTVList tvList;
public DataPointIterator(List<ManagedSeriesReader> readers) {
// 不带值过滤... SeriesRowDataBatchReader
}
public DataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
// 带值过滤...
}
public abstract class DataPointBatchIterator
implements IBatchIterator { // IBatchIterator实现为按需获取
protected ElasticSerializableTVList tvList;
public DataPointBatchIterator(List<ManagedSeriesReader> readers) {
// 不带值过滤...
}
public DataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
// 带值过滤...
}
}
class SizeLimitedDataPointBatchIterator
extends DataPointBatchIterator {
public SizeLimitedDataPointBatchIterator(List<ManagedSeriesReader> readers, int batchSize) {
// 不带值过滤...
}
public SizeLimitedDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, int batchSize) {
// 带值过滤...
}
// ...
}
class TimeWindowDataPointBatchIterator
extends DataPointBatchIterator {
public TimeWindowDataPointBatchIterator(List<ManagedSeriesReader> readers, long timeWindowInNanoSeconds) {
// 不带值过滤...
}
public TimeWindowDataPointBatchIterator(List<IReaderByTimestamp> readers, TimeGenerator generator, long timeWindowInNanoSeconds) {
// 带值过滤...
}
// ...
public class OverallDataPointIterator
implements IOverallDataPointIterator {
protected ElasticSerializableTVList tvList;
// ...
public OverallDataPointIterator(List<ManagedSeriesReader> readers)
{
// 不带值过滤... SeriesRowDataBatchReader
}
public OverallDataPointIterator(List<IReaderByTimestamp> readers, TimeGenerator generator) {
// 带值过滤...
}
}
不改变IoTDB整体的查询框架,
UDTFQueryPlan
UDTFQueryExecutor通过
UDTFQueryDataSet
对于RawQuery,其AlignByTime的QueryDataSet按照带值过滤与否被实现为了RawQueryDataSetWithoutValueFilter和RawQueryDataSetWithValueFilter;
NonAlign的QueryDataSet在
UDTFQueryDataSet
UDAFQueryPlanUDAFQueryPlan构造
...