Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The return type of the fullWindowPartition() method is PartitionWindowedStream, which provides multiple APIs for full window processing, including mapPartition, sortPartition, aggregate and reduce.

For non-keyed datastream, the upstream operators must have a POINTWISE connection pattern with operators in PartitionWindowedStream. PartitionWindowedStream does not support customizing parallelism. Its parallelism is consistent with the previous operator.

...

In our implementation, the operator will execute the MapPartitionFunction while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator while receiving records, rather than waiting for the entire window of records to be collected. To achieve this, we add a seperate UDFExecutionThread inside the operator.

The TaskMainThread will cyclically add records to a fixed-size queue. The UDFExecutionThread will invoke user-defined MapPartitionFunction and cyclically poll records from the queue in the Iterator parameter of MapPartitionFunction. If there is no records in the queue, the UDFExecutionThread blocks and waits on the hasNext() and next() methods of the Iterator. Once the UDFExecutionThread has processed all the data, the operator completes its execution. 

...

We introduce the sortPartition API  in the PartitionWindowedStream, including three methods to sort records by different key extraction logics API  in the PartitionWindowedStream, including four methods to sort records by different key extraction logics.

Code Block
languagejava
public class PartitionWindowedStream<T> {

     /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple}.
     *
     * @param field The field index on which records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(int field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the specified field in the 
     * specified order. The type of records must be {@link Tuple} or POJO 
     * class. The POJO class must be public and have getter and setter methods 
     * for each field. It mustn't implement any interfaces or extend any 
     * classes.
     * 
     * @param field The field expression referring to the field on which 
     * records is sorted.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public DataStream<T> sortPartition(String field, Order order) {
      ...
    }

    /**
     * Sorts the records of the window on the extracted key in the specified order.
     *
     * @param keySelector The KeySelector function which extracts the key 
     * from records.
     * @param order The order in which records is sorted.
     * @return The resulting data stream with sorted records in each subtask.
     */
    public <K> DataStream<T> sortPartition(KeySelector<T, K> keySelector, Order order) {
      ...
    }
  
}

...