Unable to render Jira issues macro, execution error.

UDTF管理

装载

UDTF通过反射技术装载

CREATE FUNCTION function_name AS class_name;

卸载

DROP FUNCTION function_name;

展示

SHOW FUNCTIONS;

管理器

装载卸载管理:代码见org.apache.iotdb.db.query.udf.service.UDFRegistrationService

临时文件管理:代码见org.apache.iotdb.db.query.udf.service.TemporaryQueryDataFileService


用户接口设计

所有提供给用户编程的接口/类都在包 org.apache.iotdb.db.query.udf.api 中。所有的接口/类都添加了详细的Javadoc,可以选择从UDTF类开始阅读了解。

所有用户实现的UDTF都要通过继承UDTF类实现。UDTF类包括4个可以Override的方法,分别是beforeStart、beforeDestroy和两个签名不同的transform方法。一个完整的UDTF类至少需要Override beforeStart方法和其中一种transform方法。

beforeStart方法在transform方法调用前调用。主要作用有三类。

  • 第一类作用是利用UDFParameters解析用户输入的UDTF参数,即UDTF括号中的部分。UDTF允许用户定义两类参数,一种是时间序列参数,一种是KV对形式(KV都必须是使用引号引用的字符串)的属性参数。KV对形式的参数必须在输入完全部的时间序列参数后才能输入。两类参数都可以为空。时间序列参数决定了一个UDTF的原始数据由哪些序列生成,用户将在transform方法中拿到这些原始数据。如果时间序列参数多于1个,那么用户将得到的是这几列时间序列按照时间对齐后的数据行。用户可以在UDFParameters中得到输入的时间序列Path,可以通过UDFParameters内置的解析工具将KV对形式的属性参数进行类型转换。
  • 第二类作用是利用UDTFConfigurations配置UDTF的必要属性。目前有两类必要属性,一类是UDTF的输出数据类型,还有一种是原始数据的访问策略。目前允许用户指定三种类型的原始数据访问策略。(1)逐行读取原始数据;(2)滑动时间窗口读取原始数据(类似GroupBy的滑动时间窗口,需要指定DisplayWindowBegin、DisplayWindowEnd、TimeInterval和SlidingStep,该时间窗口允许窗口重叠);(3)固定窗口数据行数目的方式读取原始数据。其中后面两种需要指定参数,这些参数可以通过UDFParameters从用户输入的SQL中获得。
  • 第三类作用是为查询分配一些自定义的资源:打开文件、建立外部连接等。

beforeDestroy方法在一个UDTF查询结束后被调用。主要作用是释放用户自定义的一些资源。

transform方法是UDTF的核心方法,它的作用是对原始数据进行变换。它有两种签名,一种是UDTF#transform(Row, PointCollector),一种是UDTF#transform(RowWindow, PointCollector),每一种签名的第一个参数为原始数据的输入参数,第二个参数为变换后数据的输出参数。当用户在UDTFConfigurations中将访问原始数据的策略配置为逐行读取时,第一种签名的方法就会被调用。当用户将原始数据的访问策略配置为按滑动时间窗口读取或者固定数据行数的窗口读取时,第二种签名的方法就会被调用。根据原始数据划分的行数或者窗口数量,transform方法将被多次调用,每次调用都允许用户生成任意数量的数据点。


查询计划生成

有三点是需要关注的。

其一,由于允许原始数据查询与UDTF查询混合进行,所以需要修改原先逻辑计划和物理计划的生成方式。基本的做法是引入了UDFContext这一数据结构。相关文件可见SelectOperator、UDFPlan和LogicalGenerator、PhysicalGenerator。

其二,支持UDTF输入多个时间序列的场景,还必须考虑输入序列为*的情况。如下面的场景:现在有时间序列root.sg.d.s0, root.sg.d.s1, root.sg.d.s2, root.sg.d.s3,执行查询select udf(*, *, *) from root.sg.d 时,实际是执行了4*4*4个UDTF查询。在查询计划生成的时候,需要消除*符号并计算查询候选序列的笛卡尔积。代码逻辑主要在ConcatPathOptimizer中。

其三,UDTF查询列去重的逻辑。注意udf(a, b)和udf(b, a)不同,udf(a, b)和udf(a, b, 'k'='v')不同。代码逻辑主要在PhysicalGenerator#deduplicate中。


求值架构与过程

UDTF的求值采用了如上图所示的3层架构:OutputLayer、TransformerLayer和InputLayer。

求值的基本思想是按需惰性求值:当输出层需要输出新的RowRecord时,Transformer层会首先消耗缓存的先前一次tranform执行生成的数据点,当缓存数据点全部被消耗,才执行新的一批数据的transform,这时才可能要求原始查询层输出下一个RowRecord。

InputLayer

InputLayer由原始查询的输出层RawQueryDataSetWithValueFilter或者RawQueryDataSetWithoutValueFilter,以及一个缓存队列ElasticSerializableRowRecordList组成。

RawQueryDataSet的输入时间序列为查询中UDTF查询和原始查询所有输入时间序列的并集;查询条件输入则与实际查询输入的一致。

InputLayer允许Transformer层按需调用RawQueryDataSet的nextWithoutConstraint()和hasNextWithoutConstraint()获取下一行原始数据的结果,结果会直接缓存到缓存队列中,Transformer层可以通过index访问原始数据;InputLayer允许Transformer层设置原始数据输出层某一列最小未消费元素的索引,以便在必要的时候减少内存管理时序列化的次数(见内存管理)。

代码见包org.apache.iotdb.db.query.udf.core.input。

TransformerLayer

TransformerLayer是架设在InputLayer和OutputLayer之间的中间层,它的作用是:直接传递或者变换InputLayer中的一列或者多列原始数据,产生一列新的时间序列,并对外提供一个统一的列式读取接口,使得OutputLayer能够按列读取新的时间序列,方便生成新的DataSet。

设计中,每一个输出结果列都会对应一个Transformer。根据输出列类型的不同,又可以分为三种Transformer:

  • RawQueryPointTransformer:用于转发原始数据查询的列。Transformer中保存了指定列中最后一个被消费的非空数据的索引,该索引在每次转发时就会更新一次。
  • UDFQueryRowTransformer:用于生成变换函数为UDTF#transform(Row, PointCollector)的列(用户指定UDTF访问原始数据方式为逐行访问的列)。Transformer中保存了所有用于变换的列的索引,同时保存了最后一个被消费的原始数据行的索引。transform方法的执行会在Transformer中进行,行索引在每次执行时就会更新一次。transform的执行结果会在Transformer中缓存,直到被OutputLayer完全消费。
  • UDFQueryRowWindowTransformer:用于生成变换函数为UDTF#transform(RowWindow, PointCollector)的列(用户指定UDTF访问原始数据方式为窗口访问的列)。Transformer中保存了所有用于变换的列的索引,同时以数组的形式保存了一个窗口对应的所有原始数据行的索引。transform方法的执行会在Transformer中进行,一个窗口中所有原始数据的索引在每次执行时就会更新一次。transform的执行结果会在Transformer中缓存,直到被OutputLayer完全消费。

上述三种Transformer都会实现LayerPointReader接口,OutputLayer利用该接口操纵数据。

下图是Transformer的类继承关系。

代码见包org.apache.iotdb.db.query.udf.core.transfomrer。

OutputLayer

作用是列式读取TransformerLayer的数据,并拼接UDTF查询的结果集。

现在支持生成NonAlign结果集和AlignByTime结果集。两个结果集的生成逻辑与原始查询对应的两个结果集生成逻辑相似,区别在于UDTF的结果集生成是串行执行的。

代码见org.apache.iotdb.db.query.dataset.UDTFDataSet及其子类。

One More Thing

可以基于这种求值模式进行扩展,支持UDTF复合函数求值:每一个UDTF单元都对应一个transformer,然后对transform的执行时机进行拓扑排序,构建tranformer的执行pipeline……然后下推求值。


内存管理

在整个UDTF的求值过程中,有3处地方需要进行内存管理:

  1. InputLayer: TransformerLayer中不同Transformer的消费进度不同,可能导致在内存中驻留大量的RowRecord
  2. TransformerLayer: 窗口中数据行的数目过大,可能导致可用内存无法装下所有的索引信息
  3. OutputLayer: 某一个Transfomer在一个窗口中生成的数据点数过多,导致占用大量的内存

基于上述考虑,设计了目前的内存管理的方案:

  1. 用户可以配置一个总最大允许的内存用量(记为T),用于含有UDTF查询的过程(例如:select a, b, c, udf(a, b), udf(b, c)就是一个含有UDTF的查询过程,这个过程包含两个UDTF查询)。T默认为300MB,最大允许用户将T设置为读内存的20%。// 这个总允许花费内存的配置方法可以再讨论
  2. T分为3个部分使用,用户可以按比例把T划分给InputLayer、TransformerLayer和OutputLayer。默认分配为InputLayer:TransformerLayer:OutputLayer = 1:1:1。
  3. 分配给InputLayer的内存全部用于缓存未完全被消费的原始数据RowRecord。
  4. 分配给Transformer的内存利用如下:记Transformer中需要进行窗口计算的Transformer数量为n,那么分配给Transformer的总内存会被平均划分为n份,每一份内存供一个Transform用于缓存索引。对于不需要进行窗口计算的Transformer(原始数据Transformer和逐行处理数据的Transformer),内存占用为O(1),无需特别考虑。
  5. 分配给OutputLayer的内存利用如下:记去重后的输出列个数为n,那么分配给OutputLayer的总内存会被平均划分为n份,每一份内存供一个PointCollector用于缓存收集到的结果数据点。

上述1、2相关代码IoTDBDescriptor#loadUDFProps()。上述3、4、5相关代码见UDTFDataSet。

从上述的内存管理方案中我们可以看到,分配给InputLayer/TransformerLayer/OutputLayer的内存是一个有限的定值。我们需要估计上述三个Layer的内存用量并在Layer内存超出最大允许值时采取行动。为此,我们设计了3种数据结构,分别是ElasticSerializableRowRecordList、ElasticSerializableIntList和ElasticSerializableTVList。他们都基于同一种思想进行设计,下面将介绍这一思想。

场景特征:

  1. 内存用量受限,设最大允许用量为M。
  2. 内存受限的数据结构类似于列表,需要对该种列表进行append操作,进行顺序读和随机读操作。
  3. 在某一时刻后,某个index前的数据可以全部丢弃。

设计:

  • 将列表按照固定长度划分为若干块,每次只允许最多N块驻留在内存中。当总块数超过N块时,使用LRU策略替换块,被替换的块将被序列化到临时文件中。查询结束后清除该查询所有的临时文件。
  • 可以根据场景指定驻留在内存中的块数N,那么每个块能够分配到的内存为M/N。
  • 对于固定长度的数据而言,假定每条数据长度为E,那么每个块能够容纳的数据条数为M/N/E。
  • 对于不固定长度的数据而言,使用探测的方法定出一个数据长度上限E用于块中数据条数的计算,因而每个块能够容纳的数据条数依旧计算为M/N/E。
    • 用户可以在配置文件中指定数据长度上限的初始值E,默认为48bytes
    • 维护一个实际总长度值Ma。每次列表有append操作,就将append数据的长度累加到实际总长度值上。
    • 每当append操作到达1000次(达到一次检查阈值),就执行一次内存占用检查。如果当前列表的长度l * E > Ma,即开始一次内存调整:
      • 首先倍增法调整E,将当前E翻倍,检查是否满足l * E <= Ma;若不满足,则不断翻倍、检查,直到找到这个E
      • 检查M/N/E是否小于阈值(为了保证性能,序列化的块也不能太小),如果此时的E导致块大小过小,则更换下面方法调整E
        • E <- E + 2 * ((Ma - l * E) / l)
        • 如果利用新方法调整E仍然会导致块大小过小,则直接抛异常,要求用户分配更多内存
      • 创建新的列表,使用新E计算一个块可以容纳的元素数量,将旧列表的元素拷贝到新列表中
    • 对于一个现实的序列,其中负载的数据长度应该是比较稳定的。在append次数足够多之后,基本可以计算得到一个稳定的数据长度平均值。内存调整在数据刚开始append的时候可能会比较频繁,当然这时候序列基本都在内存当中,全序列拷贝基本不会触发内外存的交换(此时扩增操作基本等同于List的扩增),也就不会有效率上的问题。而当数据append到需要内外存交换的程度时,基本也无需触发内存调整了。
  • 给定元素索引index,可立即计算得到元素位于索引为index/(M/N/E)的块中,在块中的索引为index%(M/N/E)。
  • 利用上面提到的场景特征3,还可以进一步减少块序列化的次数(仅使用在ElasticSerializableRowRecordList和ElasticSerializableTVList中):记录列表中最小一个有用的index,当触发LRU替换时,若被替换块的最大index小于最小有用index,则直接抛弃该块,不再进行序列化(因为它永远不会被反序列化)。


  • No labels