Authors:  Xingbo Huang, Dian Fu

Discussion thread
Vote thread
JIRA

Unable to render Jira issues macro, execution error.

Release1.12

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Motivation

Pandas UDF has been supported in FLINK 1.11 (FLIP-97). It solves the high serialization/deserialization overhead in Python UDF and makes it convenient to leverage the popular Python libraries such as Pandas, Numpy, etc. Since Pandas UDF has so many advantages, we want to support Pandas UDAF to extend usage of Pandas UDF.

Goals

  1. Support Pandas UDAF in Batch Group Aggregation
  2. Support Pandas UDAF in Batch Group Window Aggregation
  3. Support Pandas UDAF in Batch Over Window Aggregation
  4. Support Pandas UDAF in Stream Group Window Aggregation
  5. Support Pandas UDAF in Stream Bounded Over Window Aggregation

Non-Goals

  1. Because Pandas UDAF doesn’t support partial calculation and retraction, we need to store all data, which will result in OOM in unbounded Stream Group Aggregation and Over Window Aggregation. So we won’t support Pandas UDAF in Stream Group Aggregation and Stream Unbounded Over Window currently.

Proposed Changes

In FLIP-58 and FLIP-97, we use the udf decorator name to declare our general udf and pandas udf, and in PyFlink UDTF design doc, we use the udtf decorator name to declare Python udtf. So we prefer to use the udaf decorator name to declare our pandas udaf. Besides, similar to Pandas UDF, we pretend to use udaf_type = 'pandas' to declare Pandas UDAF.

@udaf(result_type=DataTypes.FLOAT(), udaf_type="pandas")

def mean_udaf(v: pd.Series) -> float:

    return v.mean()

Pandas UDAF takes one or more pandas.Series as the input arguments and returns one scalar value as output. 

There are many ways to define a Pandas aggregate function besides extending the base class `AggregateFunction` introduced in FLIP-139

# option 1: extending the base class `AggregateFunction`

class Model(AggregateFunction):

     def open( self, function_context):

          # Load the Model back from file
          with open(Pkl_Filename, 'rb') as file:
                self.Pickled_LR_Model = pickle.load(file)


     def create_accumulator(self):

          return []


     def get_value(self, accumulator):

          return accumulator[0]


     def accumulate(self, accumulator, *args):

          df = DataFrame({'c1': args[0], 'c2': args[1]})

          result = Pickled_LR_Model.predict(args)

          accumulator.append(result.mean())


     def close():

          self.Pickled_LR_Model = None


func = udaf(Model(), result_type = DataTypes.FLOAT(), udaf_type="pandas")


#  option2:  Python function

@udaf(result_type=DataTypes.FLOAT(), udaf_type="pandas")

def mean_udaf(v: pd.Series) -> float:

    return v.mean()

@udaf(result_type=DataTypes.FLOAT(), udaf_type="pandas")

def custorm_sort_udaf(c1: pd.Series, c2: pd.Series) -> float:
    df = DataFrame({'c1': c1, 'c2': c2})

    return df.sort_values(by='c2', ascending=False)['c1'].iloc[0]


# option 3: lambda function
mean_udaf = udaf(lambda v: v.mean(),  result_type=DataTypes.FLOAT(), udaf_type="pandas")

The execution of Pandas UDAF will be different in group agg, group window agg and over window agg, so we will describe in detail separately.

Similar to Java Batch Sort Aggregation, we will need a pre-sorting step to put the datas of the same key together so that we only need to buffer the data under one key at the same time.

The Pandas GroupBy Aggregation is executed as following:

  1. If a new key data comes in, it’s known that all the data belonging to the old key is collected.
  2. ArrowSerializer will be used to serialize the collection of data belonging to the old key.
  3. ArrowCoder will be used to deserialize the data and then calling the Pandas UDAF with the deserialized data.
  4. PandasAggregationOperation will wrap the result of Pandas UDAF as Pandas Series.
  5. The Pandas Series will be sent to the Java Operator.
  6. PandasAggregateFunction Operator will join the key with the result of Pandas UDAF.
  7. The joined result will be sent to the downstream operator

Similar to Pandas Batch Group Aggregation, we will need a pre-sorting step to put the data of the same key together so that we only need to store all the data under one key at the same time. In addition, once a data of the new key arrives, all data of all windows of the previous key can be triggered for calculation.

The Batch Pandas GroupBy Window Aggregation is executed as following:

  1. If a new key data comes in, PandasGroupWindowAggregateFunction Operator will trigger all windows with the old key.
  2. ArrowSerializer will serialize the collection of data in every triggered window which will be sent to PVM.
  3. ArrowCoder will deserialize the data which will be input args of Pandas UDAF.
  4. PandasAggregationOperation will wrap the result of Pandas UDAF as Pandas Series.
  5. The Pandas Series will be sent to the JVM Operator.
  6. PandasGroupWindowAggregateFunction Operator will join the key with the window property and the result of Pandas UDAF.
  7. The joined result will be sent to the downstream operator.

Similar to Java Stream Group Window Aggregation, we will use ListState to store all data in the current key.  As we all know, Tumbling Window, Sliding Window and Session Window can be used in Stream Group Window Aggregation. Next, we will discuss the implementation details of these three different Window Assigners.

Similar to Java Stream Group Window Aggregation, the only difference is that we need to send all data in every Tumbling Window to Python Worker.

As we all know, unlike Tumbling Window, different windows of Sliding Window have overlapping data, so we will have two solutions to process the data in the window:

  1. Same as Tumbling Window, we will send all data in every Sliding Window to the Python Worker.
  2. For the first window triggered, we will send all the data in the window. But for other windows triggered later, we will only send the data of the non-overlapping part of the window.



Pros

Cons

Solution 1

Stateless

Overlapping data will be sent multi times

Solution 2

Overlapping data will be sent only once

Python Worker need to store overlapping data which means Python Worker will need to store all data of a window 


For Solution 2, if the window size is 5 min, the amount of data needs to be buffered at Python side is all the data in 5 min and so we prefer Solution 1 for now as Solution 2 may cause Python Worker OOM in many cases. In the future, maybe we could make the strategy configurable.

Similar to Java Stream Group Window Aggregation, the only difference is that we need to merge datas of two windows together rather than calling the merge function of Agg function to merge two windows. 

The Stream Pandas GroupBy Window Aggregation are executed as following:

  1. If a new element comes in, PandasGroupWindowAggregateFunction Operator will put the element to the ListState of every assigned window.
  2. ArrowSerializer will serialize the collection of data in every triggered window which will be sent to PVM.
  3. ArrowCoder will deserialize the data which will be input args of Pandas UDAF.
  4. PandasAggregationOperation will wrap the result of Pandas UDAF as Pandas Series.
  5. The Pandas Series will be sent to the JVM Operator.
  6. PandasGroupWindowAggregateFunction Operator will join the key with the window property and the result of Pandas UDAF.
  7. The joined result will be sent to the downstream operator.

As we all know, over windows can be defined on event-time or processing-time and on ranges specified as time interval or row-count. It is different to handle the window boundary for a specific type of over window. 

  • For unbounded over window, the window boundary doesn’t need to be sent from JVM to PVM.
  • For bounded row-count over window, the calculation of the window boundary is very easy, so we can encode the window boundary into the UDF proto which will be only transferred once from JVM to PVM.
  • For bounded event-time or processing time over window, the calculation of window boundary depends on the time attribute of row data. If we put the calculation of the window boundary on the Python side, the overhead will be very large. So we will calculate the window boundary on the Java side and send it along with the data.

The mapping between over window type and how to transfer the window boundary is summarized as following:


Over Window Type

UDF Proto

Data

Row Unbounded Window

N/A

N/A

Row Unbounded Preceding Window

upper window boundary

N/A

Row Unbounded Following Window

lower window boundary

N/A

Row Sliding Window

lower window boundary and upper window boundary

N/A

Range Unbounded Window

N/A

N/A

Range Unbounded Preceding Window

N/A

upper window boundary

Range Unbounded Following Window

N/A

lower window boundary

Range Sliding Window

N/A

lower window boundary and window boundary


Next, we will talk about how to declare window boundaries in UDF proto.

message Window {

     enum WindowType {

         RANGE_UNBOUNDED = 0;

         RANGE_UNBOUNDED_PRECEDING = 1;

         RANGE_UNBOUNDED_FOLLOWING = 2;

         RANGE_SLIDING = 3;

         ROW_UNBOUNDED = 4;

         ROW_UNBOUNDED_PRECEDING = 5;

         ROW_UNBOUNDED_FOLLOWING = 6;

         ROW_SLIDING = 7;

     }

     int64 lowerBoundary = 1;

     int64 upperBoundary = 2;

     WindowTye windowType = 3;

}

message UserDefinedFunction {

    // ...

    int32 windowIndex = 3;

}

message UserDefinedFunctions {

     // ...

     repeated Window windows = 3;

}

Next, we will talk about how to encode window boundaries into data channels. As we know from FLIP-97, a batch of input data will be converted into a list of Arrow Format data. So we pretend to put the window boundaries of these input data preceding the Arrow Format data.

For example, if we have:

avg(v) over (PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),

avg(v) over (PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),

avg(v) over (PARTITION BY a ORDER BY rowtime RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)


The encoded input will be encoded to:

window-num(int), 

w1-array-num(int), [w1-row1-upper-boundary, w1-row2-upper-boundary,...], w3-array-num(int), [w3-row1-lower-boundary, w1-row1-upper-boundary,...],

[arrow format input data]

Note that w1 is a range unbounded preceding window, so the lower boundary doesn’t need to be encoded. Besides, w2 is a range unbounded window, so the window boundary doesn’t need to be encoded.

The Batch Pandas Over Window Aggregation are executed as following:

  1. PandasOverWindowAggregateFunction Operator will store all data with the same key.
  2. When a new key element comes in, ArrowSerializer will serialize the collection of data in the last key.
  3. The window boundary data and the arrow format data in the previous step will be sent to PVM.
  4. ArrowCoder will deserialize the data which will be input args of Pandas UDAF.
  5. PandasOverWindowAggregationOperation will slice the input parameters of Pandas UDAF under different over windows according to the window boundary.
  6. PandasOverWindowAggregationOperation will wrap the result of Pandas UDAF as Pandas Series.
  7. The Pandas Series will be sent to the JVM Operator.
  8. The PandasOverWindowAggregateFunction Operator will join the key with the result of Pandas UDAF. 
  9. The joined result will be sent to the downstream operator.

Similar to Stream Group Window Aggregate in Sliding Window, for avoiding OOM, we choose to send all data belonging to every over window to Python Worker.

The Stream Pandas Over Window Aggregation are executed as following:

  1. If a new element comes in, PandasOverWindowAggregateFunction Operator will use a MapState to store the element into a list with rowtime or process time as key.
  2. PandasOverWindowAggregateFunction Operator will register a timer to trigger computing.
  3. When the timer is triggered, ArrowSerializer will serialize the collection of data in every triggered over window which will be sent to PVM.
  4. ArrowCoder will deserialize the data which will be input args of Pandas UDAF.
  5. PandasOverWindowAggregationOperation will wrap the result of Pandas UDAF as Pandas Series.
  6. The Pandas Series will be sent to the JVM Operator.
  7. PandasOverWindowAggregateFunction Operator will join the key with the result of Pandas UDAF.
  8. The joined result will be sent to the downstream operator.

Compatibility, Deprecation, and Migration Plan

This FLIP won’t destroy compatibility.

Implementation Plan

  1. Support Pandas UDAF in Batch GroupBy Aggregation
  2. Support Pandas UDAF in Batch GroupBy Window Aggregation
  3. Support Pandas UDAF in Stream GroupBy Window Aggregation
  4. Support Pandas UDAF in Batch Over Window Aggregation
  5. Support Pandas UDAF in Stream bounded Over Window Aggregation
  6. Add documentation for Pandas UDAF