Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


here (<- link to list.html?dev@flink.apache.org)
Discussion thread[DISCUSS] FLIP-380: Support Full Partition Processing On Non-keyed DataStream-Apache Mail Archives
Vote thread
https://lists.apache.org/
Vote threadTBD
thread/ns1my6ydctjfl9w89hm8gvldh00lqtq3 
JIRA
TBD
Release<Flink Version>TBD


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In DataStream API, a non-keyed DataStream can be transformed into a KeyedStream through keyBy, and then further transformed into a WindowedStream through window operations. WindowedStream enables window processing on records with the same key. With the improvement of FLIP-331, WindowedStream will support the full window processing by the window assigner EndOfStreamWindows for which the which the window is only triggered at the end of inputs.

However, full window processing is not supported directly by non-keyed DataStream. This means the non-keyed DataStream cannot collect all records of each subtask (these records have no keys) separately into a full window and process them at the end of inputs. DataSet API already supports processing and sorting all records within each subtask through mapPartition API and sortPartition API.  As DataSet API has been deprecated in Flink 1.18, it is necessary to enhance the non-keyed DataStream to support handling full window processing on individual subtasks.

In this FLIP, we propose two main enhancements. Firstly, we propose enabling non-keyed DataStream to directly transform into a PartitionWindowedStream. The PartitionWindowedStream represents collecting all records of each subtask separately into a full window. Secondly, we propose supporting four APIs on PartitionWindowedStream, including mapPartition,

...

sortPartition,

...

aggregate

...

and reduce.

...

Public Interfaces

1. We introduce the fullWindowPartition method to the DataStream class.

...

3. We add four APIs to PartitionWindowedStream,  including mapPartition, sortPartition, aggregate and reduce.

Proposed Changes

...

Support only full window processing on non-keyed DataStream

If DataStream supports arbitrary window types for processing data on each subtask, it would not only support full window but also other window types such as We propose to support only full window processing on non-keyed DataStream. There are challenges in supporting arbitrary types of windows, such as count windows, sliding windows, and session windows. However, we have chosen to support only full window processing on DataStream. The main reason is These challenges arise from the fact that the DataStream is non-keyed and does not support keyed statebackend and keyed raw state. This issue results in two conflicts on the usage of various windows:

1. The storage of window state relies on the InternalKvState provided by KeyedStateBackend. The storage of timers in time service also relies on keyed raw state.

2. The recovery of window state and timers relies on the assumption that every record has a key. The window state and timers will be stored and distinguished by the key. In fault tolerance and rescaling scenarios, each subtask must be assigned a key range to recover the window state and timers correctly.

Furthermore, based on community feedbacks, we have not observed that any user requests for arbitrary window processing on non-keyed DataStream.

Full window processing has unique characteristics and is primarily applicable to batch processing scenarios. As such, it can be designed to work only when RuntimeExecutionMode=BATCH Therefore, we propose to support only full window processing on DataStream. This feature is designed to work only in batch mode and does not support checkpointing. If user specifies the stream modecheckpoint. When specifying RuntimeExecutionMode=STREAMING, the job will be failed to submit. Given this premisefail on submission. Without the requirement for checkpoint, the underlying implementation can no longer rely on state and avoid the aforementioned conflict issues by not relying on keyed state.

The definition of PartitionWindowedStream

We define the PartitionWindowedStream as the result from transforming in a manner similar to dataStream.window(EndOfWindows.get()).  We add the fullWindowPartition method in DataStream class to replace the step of window(EndOfWindows.get()) and return the PartitionWindowedStream in the method.

The PartitionWindowedStream does not support setting the parallelism and has the same parallelism with previous operator.

The PartitionWindowedStream class will be annotated by @PublicEnvolving as more APIs may be added in the future.

Here is the fullWindowPartition method and PartitionWindowedStream class.

Introduce the PartitionWindowedStream

To enable full window processing on non-keyed DataStream, we need to define the corresponding method in DataStream class. One approach is to directly add a window(WindowAssigner assigner) method to DataStream. This method only supports passing the EndOfStreamWindows introduced in FLIP-331. If users want to apply full window processing, the corresponding job code will bedataStream.window(EndOfStreamWindows.get()).

However, this approach may lead users to mistakenly pass window assigners other than EndOfStreamWindows to the method, which is not supported.

To avoid such issues, we propose defining a parameterless fullWindowPartition() method on DataStream class to express the semantics of full window processing. This method is functionally equivalent to the aforementioned dataStream.window(EndOfStreamWindows.get()). By adopting this approach, users can explicitly apply full window processing without any misunderstandings.

The DataStream class has 6 subclasses, which all have access to the fullWindowPartition() method. However, KeyedStream and IterativeStream should not support non-keyed full window processing. The table below indicates whether these 6 subclasses should support the feature:


DataStream subclassSupport non-keyed full window processing 
KeyedStream
IterativeStream
DataStreamSource
CachedDataStream
SideOutputDataStream
SingleOutputStreamOperator


KeyedStream is a keyed DataStream and should only support keyed full window processing. Therefore, we will override the fullWindowPartition() method in the KeyedStream class and apply keyed full window processing in it.

IterativeStream has been marked with the @Deprecated annotation in Flink 1.19. Consequently, we will override the fullWindowPartition() method in the IterativeStream class and throw an UnsupportedOperationException.

The return type of the fullWindowPartition() method is PartitionWindowedStream, which provides multiple APIs for full window processing, including mapPartition, sortPartition, aggregate and reduce.

For non-keyed datastream, the upstream operators must have a POINTWISE connection pattern with operators in PartitionWindowedStream. PartitionWindowedStream does not support customizing parallelism. Its parallelism is consistent with the previous operator.

Here is the fullWindowPartition method and PartitionWindowedStream class.

Code Block
languagejava
public class DataStream<T> {
  
  ...
  
  /**
   * Collect records in each partition of this data stream separately into a full 
   * window. The window emission will be triggered at the end of inputs. A partition
   * contains all records of a subtask for non-keyed DataStream and contains all records
   * of a key for {@link KeyedStream}
Code Block
languagejava
public class DataStream<T> {
  
  ...
  
  /**
   * Collect records in each subtask of this data stream separately into a full 
   * window. The window emission will be triggered at the end of inputs.
   *
   * @return The data stream full windowed on each subtaskpartition.
   */
  public PartitionWindowedStream<T> fullWindowPartition() {
    ...
  }
}

...

Code Block
languagejava
/**
 * {@link PartitionWindowedStream} represents a data stream that collects all 
 * records of each subtaskpartition separately into a full window. Window emission will 
 * be triggered at the end of inputs. end of inputs. A partition contains all records of a subtask 
 * for non-keyed DataStream and contains all records of a key for {@link KeyedStream}.  
 *
 * @param <T> The type of the elements in this stream.
 */
public class PartitionWindowedStream<T> extends DataStream<T> {
  ...
}

API Implementation

...

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Process the records of the window by {@link MapPartitionFunction}. The 
     * records will be available in the given {@link Iterator} function 
     * parameter of {@link MapPartitionFunction}.
     *
     * @param mapPartitionFunction The {@link MapPartitionFunction} that is 
     * called for the records in the full window.
     * @return The resulting data stream.
     * @param <R> The type of the elements in the resulting stream, equal to the
     *     MapPartitionFunction's result type.
     */
    public <R> DataStream<R> mapPartition(MapPartitionFunction<T, R> mapPartitionFunction) {
      ...
    }
}

...

Next, we will propose the operator implementation for the mapPartition API . 

In our implementation, the operator will execute the MapPartitionFunction

...

while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator.

The TaskMainThread will cyclically add records to a fixed-size queue. The UDFExecutionThread will invoke user-defined MapPartitionFunction and cyclically poll records from the queue in the Iterator parameter of MapPartitionFunction. If there is no records in the queue, the UDFExecutionThread blocks and waits on the hasNext() and next() methods of the Iterator. Once the UDFExecutionThread has processed all the data, the operator completes its execution. 

The following diagram illustrates the interaction between the TaskMainThread and the UDFExecutionThread:

SortPartition

We introduce the sortPartition API  in the PartitionWindowedStream, including three methods to sort records by different key extraction logicsWe introduce the sortPartition API  in the PartitionWindowedStream, including four methods to sort records by different key extraction logics.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

     /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple}.
     *
     * @param field The field index on which records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(int field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple} or POJO 
     * class. The POJO class must be public and have getter and setter methods 
     * for each field. It mustn't implement any interfaces or extend any 
     * classes.
     * 
     * @param field The field expression referring to the field on which 
     * records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(String field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the extracted key in the specified order.
     *
     * @param keySelector The KeySelector function which extracts the key 
     * from records.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public <K> DataStream<T> sortPartition(KeySelector<T, K> keySelector, Order order) {
      ...
    }
  
}

We also show the definition of Order,  which defines the rules for sorting the recordswhich defines the rules for sorting the records.

Code Block
languagejava
public enum Order {
    /** Indicates an ascending order. */
    ASCENDING,

    /** Indicates a descending order. */
    DESCENDING
}

Next, we will propose the operator implementation for the sortPartition API we will propose the operator implementation for the sortPartition API

The TaskMainThread  will TaskMainThread will add records to the ExternalSorter, which is a multi-way merge sorter for sorting large amounts of data that cannot totally fit into memory. The ExternalSorter will sort the records according to the Order and send the sorted records to output at the end of inputs. The following diagram illustrates the interaction between the TaskMainThread and the ExternalSorterto output at the end of inputs. The following diagram illustrates the interaction between the TaskMainThread and the ExternalSorter:

Aggregate

We introduce the aggregate API  in the PartitionWindowedStream.

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {
    /**
     * Applies the given aggregate function to the records of the window. The 
     * aggregate function is called for each element, aggregating values 
     * incrementally in the window.
     *
     * @param aggregateFunction The aggregation function.
     * @return The resulting data stream.
     * @param <ACC> The type of the AggregateFunction's accumulator.
     * @param <R> The type of the elements in the resulting stream, equal to 
     * the AggregateFunction's result type.
     */
    public <ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggregateFunction) {
      ...
    }
  
}

...

Code Block
languagejava
public class PartitionWindowedStream<T> extends DataStream<T> {

    /**
     * Applies a reduce transformation on the records of the window. The 
     * {@link ReduceFunction} will be called for every record in the window.
     *
     * @param reduceFunction The reduce function.
     * @return The resulting data stream.
     */
    public DataStream<T> reduce(ReduceFunction<T> reduceFunction) {
      ...
    }
}

...

The TaskMainThread will invoke ReduceFunction#reduce for each record in the window and only send the final result to output at the end of inputs.

Rejected Alternatives

1.  In the implementation of mapPartition API, make operator do not cache records. The UDFExecutionThread must execute synchronously with TaskMainThread to get the next record and process it in MapPartitionFunction.

We choose not to use this approach because it will result in inefficient execution due to frequent thread context switching between TaskMainThread and UDFExecutionThread. Caching records enables concurrent execution of the two threads and reduces the frequency of context switching.

In the implementation of mapPartition API, make operator do not cache records. The UDFExecutionThread must execute synchronously with TaskMainThread to get the next record and process it in MapPartitionFunction.

We choose not to use this approach because it will result in inefficient execution due to frequent thread context switching between TaskMainThread and UDFExecutionThread. Caching records enables concurrent execution of the two threads and reduces the frequency of context switching.

Compatibility, Deprecation, and Migration Plan

This FLIP proposes a new feature in Flink. It is fully backward compatible1. PartitionWindowedStream will be annotated by @PublicEnvolving. It doesn't ensure compatibility between major releases and minor releases. It only ensures compatibility between patch releases.

Test Plan

We will provide unit and integration tests to validate the proposed changes.

...