Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

3. We add four APIs to PartitionWindowedStream,  including mapPartition, sortPartition, aggregate and reduce.

Proposed Changes

...

Support only full window processing on non-keyed DataStream

We propose to support only support full window processing on non-keyed DataStream. There are challenges in supporting arbitrary types of windows, such as count windows, sliding windows, and session windows. These challenges arise from the fact that the DataStream is non-keyed and does not support keyed statebackend and keyed raw state. This issue results in two conflicts on the usage of various windows:

...

Furthermore, based on community feedbacks, we have not observed that currently no users suggests to support any user requests for arbitrary window processing on non-keyed DataStream.

Full window processing has unique characteristics and is primarily applicable to batch processing scenarios. As such, it can be designed to work only when RuntimeExecutionMode=BATCH and does not support checkpoint. When specifying RuntimeExecutionMode=STREAMING, the job will fail to submiton submission. Without the requirement for checkpoint, the underlying implementation can no longer rely on state and avoid the aforementioned conflict issues.

Importantly, the DataSet API already offers full window processing capabilities. Integrating this existing capability of DataSet into non-keyed DataStream will enhance its functionality and better meet the needs of users. Therefore, we propose to only support full window processing on non-keyed DataStream.

Introduce the Introduce the PartitionWindowedStream

To enable full window processing on non-keyed DataStream, we need to define the corresponding method in DataStream class. One approach is to directly add a window(WindowAssigner assigner) method to DataStream. This method only supports passing the EndOfStreamWindows introduced in FLIP-331. If users want to apply full window processing, the corresponding job code will bedataStream.window(EndOfStreamWindows.get()).

...

The return type of the fullWindowPartition() method is PartitionWindowedStream, which provides multiple APIs for full window processing, including mapPartition, sortPartition, aggregate and reduce. mapPartition, sortPartition, aggregate and reduce.

For non-keyed datastream, the upstream operators must have a POINTWISE connection pattern with operators in PartitionWindowedStream. PartitionWindowedStream does not support customizing parallelism. Its parallelism is consistent with the previous operator.

...

In our implementation, the operator will execute the MapPartitionFunction while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator.

The TaskMainThread will cyclically add records to a fixed-size queue. The UDFExecutionThread will invoke user-defined MapPartitionFunction and cyclically poll records from the queue in the Iterator parameter of MapPartitionFunction. If there is no records in the queue, the UDFExecutionThread blocks and waits on the hasNext() and next() methods of the Iterator. Once the UDFExecutionThread has processed all the data, the operator completes its execution. 

...

We introduce the sortPartition API  in the PartitionWindowedStream, including three methods to sort records by different key extraction logics API  in the PartitionWindowedStream, including four methods to sort records by different key extraction logics.

Code Block
languagejava
public class PartitionWindowedStream<T> {

     /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple}.
     *
     * @param field The field index on which records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(int field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple} or POJO 
     * class. The POJO class must be public and have getter and setter methods 
     * for each field. It mustn't implement any interfaces or extend any 
     * classes.
     * 
     * @param field The field expression referring to the field on which 
     * records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(String field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the extracted key in the specified order.
     *
     * @param keySelector The KeySelector function which extracts the key 
     * from records.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public <K> DataStream<T> sortPartition(KeySelector<T, K> keySelector, Order order) {
      ...
    }
  
}

...