You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 22 Next »

Discussion thread-
Vote thread-



Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


Let's consider a use-case where user wants to bootstrap job state (operatorA and operatorB in the figure below) using records from a bounded source. There is no need to emit any records when the job is still processing records from the bounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).

Currently, supporting such use-case requires all operators to be deployed at the start of the job and run in the streaming mode. This approach has a couple drawbacks: 1) operatorA is 10X slower than when it is run in batch mode; 2) it is a waste of slot resource to deploy operatorB when it can not do more useful work than waiting for results from operatorA; and 3) operatorB might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will deploy its downstream operators only after this operator finishes; and 2) allowing a given operator to effectively switch from batch mode to stream mode during execution. We hope these capability can further enhance Flink as a stream-batch unified processing engine.


An operator is a batch-only operator when it meets the following properties:

  • The operator should emit records only after all its inputs have ended. There is no requirement on processing latency.
  • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting).

An operator is a stream-only operator when it meets the following properties:

  • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
  • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

Based on the definitions specified above, we can see that if a use-case do not need low processing latency, then it should use batch-mode operators to maximize throughput. Otherwise, it should use stream-mode operators to achieve low processing latency.

However, for use-case that need low processing latency only after some backlog data has been processed, such as the one described in the motivation section, neither the stream-only operator nor the batch-only operator can deliver the optimal performance. Therefore, we define stream-batch unified operator as described below:

An operator is a stream-only operator when it meets the following properties:

  • The operator can extract and handle isBacklog (a boolean value) from its inputs.
  • While any of its inputs have isBacklog=true:
    • The operator should not emit records. There is no requirement on processing latency during this stage.
    • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting) in this stage.
  • While all of its inputs have isBacklog=false:
    • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
    • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;

    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);

    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();

    public boolean isEventTime() {
        return true;

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;

        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;

        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;

2) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.

package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
public class OperatorAttributesBuilder {

    public OperatorAttributesBuilder() {
        isOutputOnEOF = false;
        isOutputOnCheckpoint = false;
        hasInternalSorter = false;

    public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...}

    public OperatorAttributes build() {...}
package org.apache.flink.streaming.api.operators;

 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize job performance.
public class OperatorAttributes {     
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     * <p>Here are the implications when it is true:
     * <ul>
     *   <li>Its output edges are blocking.
     * </ul>
    public boolean getIsOutputOnEOF() {...}

     * Returns true iff the operator can only emit records when checkpoint is triggered.
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
    public boolean getIsOutputOnCheckpoint() {...}

     * Returns true iff the operator sorts data internally.
     * <p>Here are the implications when it is true:
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     * </ul>
    public boolean getHasInternalSorter() {...}

Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion.

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();

public interface StreamOperatorFactory<OUT> extends Serializable {

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();

Proposed Changes

1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.

public abstract class Transformation<T> {
    public boolean getIsOutputOnEOF() {
        return false;

    public boolean getHasInternalSorter() {
        return false;

2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

3) Update the the CheckpointCoordinator

If any ExecutionVertex is reading from or writing to a blocking edge, then the checkpoint is disabled during this period.

If any operator reports getHasInternalSorter == true, then the checkpoint is disabled when this operator is running from a source with isBacklog=true.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.

Example Usages

     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .apply(new CustomCoGroupFunction())

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

  • No labels