Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Let's consider a use-case where user wants to bootstrap job state (operatorA and operatorB in the figure below) using records from a bounded source. There is no need to emit any records when the job is still processing records from the bounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).

Currently, supporting such use-case requires all operators to be deployed at the start of the job and run in the streaming mode. This approach has several drawbacks: 1) operatorA might be 10X slower in stream mode than when it is run in batch mode; 2) it is a waste of slot resource to deploy operatorB when it can not do more useful work than waiting/buffering results from operatorA; and 3) operatorB might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will deploy its downstream operators only after such an operator finishes; and 2) support stream-batch unified operator which can switch from batch mode to stream mode during execution without restart. We hope these capability can further enhance Flink as a stream-batch unified processing engine.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325.

Image Removed

Terminology and Background

Currently, operators supported by Flink must be either stream-only operator or batch-only operator, with the definition specified below.

An operator is a batch-only operator when it meets the following properties:

  • The operator should emit records only after all its inputs have ended. There is no requirement on processing latency.
  • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting).

An operator is a stream-only operator when it meets the following properties:

  • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
  • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

As a result of this limitation, if any input of the computation is unbounded, the operator must be stream-only, and thus can not take advantage of optimizations such as buffering/sorting. This is OK when all input records need to be processed in real-time with low processing latency. But it can easily lead to suboptimal performance when the inputs are composed of backlog records followed by the real-time records (e.g. the use-case described in the motivation section).

In order to address this issue, this FLIP proposes to support stream-batch unified operator, with the definition specified below:

An operator is a stream-batch unified operator when it meets the following properties:

  • The operator can handle isBacklog (a boolean value) metadata received on its inputs. isBacklog==false iff all records received after this metadata need to be processed in real-time. 
  • When all of its inputs has isBacklog=true:
    • The operator should not emit records. There is no requirement on processing latency during this period.
    • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting) in this stage.
  • When all of its inputs have isBacklog=false:
    • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
    • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.
  • When some (but not all) inputs have isBacklog=false, the operator can choice either one of the above two execution modes based on its semantics.

By supporting stream-batch unified operator, Flink can deliver optimal performance for the use-case described in the motivation section.

Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

Code Block
languagejava
/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}

2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean outputOnEOF = null;
    @Nullable private Boolean outputOnCheckpoint = null; 
    @Nullable private Boolean internalSorterSupported = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}

    public OperatorAttributesBuilder setOutputOnCheckpoint(boolean outputOnCheckpoint) {...}

    public OperatorAttributesBuilder setInternalSorterSupported(boolean internalSorterSupported) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - outputOnEOF defaults to false
     * - outputOnCheckpoint defaults to false
     * - internalSorterSupported defaults to false
     */ 
     public OperatorAttributes build() {...}
}

Currently, Flink allows the same operator to be used in both stream mode and batch mode. The operator can process bounded stream of records with high throughput (e.g. via sorting/buffering) in a batch-mode job, and process unbounded stream of records with low processing latency (e.g. via checkpoint with stateful state backend) in a batch-mode job. However, the operator is not able to use different modes (i.e. batch and stream modes) at different stages of the same job, which makes it hard to meet the performance requirement for jobs that need to process a bounded stream of backlog data followed by an unbounded stream of fresh data.

The bounded/unbounded streams of records can either come from the same source, or come from different sources, as shown in following examples:

1) We might have a job which aggregates records from a HybridSource (composed of FileSource and KafkaSource) and emit the results to Hudi Sink. When the job is processing records from the FileSource, user needs the job to maximize throughput without having to emit intermediate results with low processing latency. And when the job is processing records from the KafkaSource, user needs the job to emit results with low processing latency.

2) As shown in the figure below, we might have a job which bootstraps its state (operatorA and operatorB) using records from a bounded source (i.e. inputA). There is no need to emit any intermediate results when the job is processing records from the bounded source. After all records from the bounded source have been processed by operatorA and operatorB, the job needs to process records from the unbounded source and emit results with low processing latency in real-time.

Currently, supporting the above use-cases requires all operators to be deployed at the start of the job and run in the stream mode across the entire job lifetime. This approach leads to inferior performance due to these drawbacks: 1) an operator (e.g. co-group) might be 10X slower in stream mode than when it is run in batch mode; 2) it is a waste of slot resource to deploy an operator (e.g. operatorB) when its input (e.g. operatorA's results) is not available; and 3) an operator that is deployed earlier than necessary (e.g. operatorB) might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose to optimize performance for the above use-cases by allowing an operator to effectively switch its execution mode from batch to stream based on "backlog" status of the records emitted by the source operators. We expect to further enhance Flink as a stream-batch unified processing engine via this FLIP.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325. While FLIP-325 allows an operator to buffer records for arbitrary long while its input records have isBacklog=true, this FLIP additionally allows an operator to use optimizations that is currently only applicable in the batch execution mode (e.g. using ExternalSorter which does not support checkpoint).


Image Added


Behavior changes when switching from batch mode to stream mode

In section, we describe how batch mode and stream mode differ in a variety of aspects. We also describe the new behavior needed for an operator to swtich from batch mode to stream mode during execution of the same job.

For simplicity, we assume the Flink job uses default configuration values for all configurations (e.g. scheduler-mode) except execution.runtime-mode. And the operator reads records from a source which switches from isBacklog=true to isBacklog=false before it reaches EOF.


1) Scheduling strategy

Batch mode:

  • A task (which contains multiple co-located list of chained operators) are deployed after all its upstream tasks have finished.

Stream mode:

  • All tasks are deployed at the beginning of the job without waiting for upstream tasks to finish.

In mixed mode (where the job runs operators in batch mode before switching to stream mode), the same scheduling strategy as stream mode will be used to schedule tasks.

The capability to update/optimize scheduling (e.g. speculative execution, AQE) for mixed mode use-cases will be left to future work.


2) Shuffle strategy

Batch mode

  • All shuffle type will be set to BLOCKING.
  • All keyed input will be sorted before they are fed into operators. Managed memory is allocated to operators with keyed inputs to support sorting inputs.

Stream mode:

  • All shuffle type will be set to PIPELINED.

In mixed mode, the same shuffle strategy as stream mode will be used to set shuffle type between tasks. Keyed inputs will not be automatically sorted.

The capability to automatically sort inputs while isBacklog=true will be left to future work.


3) Watermark strategy

Batch mode:

  • Source operator does not emit watermark while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key. That means the downstream operator might receive watermark=Long.MAX_VALUE for keyA, and then process a record for keyB.

Stream mode:

  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records. That means the downstream operator will not see any further input records after having received watermark=Long.MAX_VALUE.

Mixed mode:

  • Source operator does not emit watermark while isBacklog=true.
  • Before source operator emits isBacklog=false, it emits watermark of Long.MAX_VALUE for one key at a time after it has emitted all records for that key.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false).
  • Source operator emits watermark based on the user-specified WatermarkStrategy and pipeline.auto-watermark-interval while it has not reached EOF.
  • Once the source operator reaches EOF, it emits watermark of Long.MAX_VALUE after it has emitted all records.

As we can see in mixed mode, the watermark behavior effectively switches from the batch mode to stream mode as the source's isBacklog switches from true to false.


4) Checkpoint and failover strategy

Batch mode:

  • No operator needs to support checkpoint. Checkpoint triggering is disabled.
  • If any task fails, this task is restarted to re-process its input from the beginning.

Stream mode:

  • Every operator needs to support checkpoint. Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails, its pipelined region is restarted to re-process its input since the last successful checkpoint.

Mixed mode:

  • Every operator needs to support checkpoint.
  • Before source operator emits isBacklog=false, checkpoint triggering is disabled.
  • If any task fails when isBacklog=false, this task is restarted to re-process its input from the beginning.
  • At the point when isBacklog switches to false, source operator emits RecordAttributes(isBacklog=false) and triggers an immediate checkpoint.
  • Checkpoint is triggered periodically according to execution.checkpointing.interval.
  • If any task fails when isBacklog=true, its pipelined region is restarted to re-process its input since the last successful checkpoint.


5) Keyed state backend

Batch mode:

  • BatchExecutionKeyedStateBackend is used. This keyed state backend leverages the fact that the input records are all sorted by key and discards the state of the current key once it sees a record coming in with a different key.


Stream mode:

  • Managed memory is allocated to operators if the keyed state backend is a subclass of AbstractManagedMemoryStateBackend.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used.

Mixed mode:

  • Managed memory is allocated to operators with keyed inputs which claims isInternalSorterSupported == true.
  • A general purpose keyed state backend which does not assume inputs sorted by key (e.g. EmbeddedRocksDBStateBackend) is used by Flink runtime.
  • Before source operator emits isBacklog=false, a downstream operator sorts input records by keys internally without accessing state backend. We will provide utility classes (derived from the existing ExternalSorter) to sort inputs.
  • At the point when isBacklog switches to false:
    - Source operator emits RecordAttributes(isBacklog=false)
    - Downstream operator will process/aggregate all the records (received before RecordAttributes(isBacklog=false)) in order of the sorted keys, using a BatchExecutionKeyedStateBackend it has explicitly instantiated internally.
    - Downstream operator moves the each (key, state) pair from BatchExecutionKeyedStateBackend to its general purpose keyed state backend once it sees a record coming in with a different key.
  • The operator continues to process records using the general purpose keyed state backend, which now contains the full state obtained from records received while isBacklog=true.

Public Interfaces


1) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean internalSorterSupported = null; 

    public OperatorAttributesBuilder
Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     *   <li>The PipelinedRegion containing this operator has checkpoint disabled.
     * </ul>
     */
    public boolean isOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean isOutputOnCheckpoint() {...}

    /**
public OperatorAttributesBuilder setInternalSorterSupported(boolean   * Returns true iff the operator uses an internal sorter when checkpoint is disabled.internalSorterSupported) {...}

    /**
     * If any 
operator attribute is null, we *will <p>Checkpointlog isit disabledat disabledDEBUG inlevel anyand ofuse the following cases:
     * default values.
     * <ul>
-     *   <li>execution.runtime-mode = BATCH.internalSorterSupported defaults to false
     *   <li>execution.checkpointing.interval-during-backlog = 0.
	 */ 
     <li>Itpublic hasOperatorAttributes isOutputOnEOF = true.
     * <ul>
     *
     * <p>Here are the implications when it is true AND checkpoint is disabled:
build() {...}
}


Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     *
     /** <ul>
     * Returns true iff <li>Itsthe inputoperator recordsuses doan notinternal needsorter towhen becheckpoint sortedis externallydisabled.
     * 
  <li>Its  managed memory* should<p>Checkpoint beis setdisabled accordingdisabled to execution.sorted-inputs.memory.
     * </ul>in any of the following cases:
     */
    public boolean isInternalSorterSupported() {...}
}

Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

* <ul>
     *   <li>execution.runtime-mode = BATCH.
     *   <li>execution.checkpointing.interval-during-backlog = 0 AND any of its input has isBacklog=true.
     * <ul>
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its input records will not to be sorted before being fed into this operator.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     * </ul>
     */
    public boolean isInternalSorterSupported() {...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.


2) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces1) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal@PublicEvolving
public abstract class Transformation<T> interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    public boolean isOutputOnEOF...

    default OperatorAttributes getOperatorAttributes() {
        return false new OperatorAttributesBuilder().build();
    }

    public boolean isOutputOnCheckpoint() {
        return false;}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    }...

      publicdefault booleanOperatorAttributes isInternalSorterSupportedgetOperatorAttributes() {
        return false new OperatorAttributesBuilder().build();
    }
}

2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

3) Update JM to make use the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsOutputOnEOF == true:
    • Its output edge is blocking
    • The PipelinedRegion containing this Transformation should never trigger checkpoint.
  • If all Transformation has IsOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.
  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0 OR isOutputOnEOF == true):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory
    • The PipelinedRegion containing this Transformation should not trigger checkpoint until all inputs have isBacklog=false.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

5) Update DataStream#coGroup to use the same optimization as the DataSet#coGroup when EndOfStreamWindows is used.

The following changes will be made:

  • Add a subclass of TwoInputStreamOperator that uses an internal sorter to do co-group with pretty much the same implementation as to DataSet#coGroup.
  • This operator should override getOperatorAttributes() to return OperatorAttributes(IsOutputOnEOF=true, IsInternalSorterSupported=true).
  • Update CoGroupedStreams#WithWindow#apply to use this operator to do co-group when the user-specified window assigner is EndOfStreamWindows.
}
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {

    public boolean isInternalSorterSupported() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsInternalSorterSupported == true AND
    (execution.runtime-mode == BATCH OR execution.checkpointing.interval-during-backlog == 0):
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.


5) For those operators whose throughput in stream mode is observably slower than its throughput in batch mode, update it to take advantage of the internal sorter when its input 6) For those stream-only operators whose throughput is observably slower than the corresponding batch-only operator, update it to be stream-batch unified such that it can use batch-like optimizations when inputs has isBacklog=true.

Typically, operations that have keyed inputs and involve aggregation operator operation (e.g. join, cogroup, aggregate) on keyed inputs can benefit from this optimizationusing an internal sorter.

Though the concrete implementation may vary across operators, typically an operator typically needs the following changes to be stream-batch unifiedfor this optimization:

  • Override getOperatorAttributes to return IsInternalSorterSupported = true if the operator needs to use sort data internally to improve throughput.
  • Override processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status.
    For example, the operator can buffer and sort input records when any input has isBacklog=true AND it is sure the checkpoint will not be triggered (e.g. =true AND execution.checkpointing.interval-during-backlog=0). And once all inputs' status has switch to isBacklog=false, it processes the buffered records, emits results, and starts to work in the stream execution mode.

Example Usages


Benchmark results

In this section, we provide benchmark results to compare the throughput between batch mode and stream mode for commonly-used operations. We hope this can help us quantify the increase in throughput after we are able to use batch mode to process backlog records.


1) Use DataStream#coGroup to process 1) User wants to co-group records from two bounded streams and emit output results after both inputs have ended.

We assume the user-defined function just do "collector.collect(1)" so that overhead of the user-defined function is minimized. The DataStream program looks like this.

Code Block
languagejavajava
data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get()data1.coGroup(data2)
     .whereapply(tuple -> tuple.f0new CustomCoGroupFunction())
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

...

addSink(...);


We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This shows with the changes proposed above, DataStream#coGroup can be 20X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode.


Note that this throughput can be higher than the existing DataStream#coGroup in batch mode due to the following reasons:

  • The operator that performs CoGroup operation will instantiate two internal sorter to sorts records from its two inputs separately. Then it can pull the sorted records from these two sorters. This can be done without wrapping input records with TaggedUnion<...>. In comparison, the existing DataStream#coGroup needs to wrap input records with TaggedUnion<...> before sorting them using one external sorter, which introduces higher overhead.
  • EndOfStreamWindows is a pre-defined WindowAssigner that assigns all records into a window ends with watermark=Long.MAX_VALUE. When this window assigner is used, the operator that performs CoGroup operation does not need to invoke either WindowAssigner#assignWindows or triggerContext#onElement.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

...