Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, supporting such use-case requires all operators to be deployed at the start of the job and run in the streaming mode. This approach has a couple several drawbacks: 1) operatorA is might be 10X slower in stream mode than when it is run in batch mode; 2) it is a waste of slot resource to deploy operatorB when it can not do more useful work than waiting for /buffering results from operatorA; and 3) operatorB might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will deploy its downstream operators only after this such an operator finishes; and 2) allowing a given operator to effectively support stream-batch unified operator which can switch from batch mode to stream mode during execution without restart. We hope these capability can further enhance Flink as a stream-batch unified processing engine.


Background


Terminology and Background

Currently, operators supported by Flink must be either stream-only operator or An operator is a batch-only operator, with the definition specified below.

An operator is a batch-only operator when it meets the following properties when it meets the following properties:

  • The operator should emit records only after all its inputs have ended. There is no requirement on processing latency.
  • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting).

An operator is a stream-only operator when it meets the following properties:

  • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
  • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

Based on the definitions specified above, we can see that if a use-case do not need low processing latency, then it should use batch-mode operators to maximize throughput. Otherwise, it should use stream-mode operators to achieve low processing latency.

As a result of this limitation, if any input of the computation is unbounded, the operator must be stream-only, and thus can not take advantage of optimizations such as buffering/sorting. This is OK when all input records need to be processed in real-time with low processing latency. But it can easily lead to suboptimal performance when the inputs are composed of backlog records followed by the real-time records (e.g. the use-case described in the motivation section).


In order to address this issue, this FLIP proposes to support stream-batch unified operator, with the definition specified However, for use-case that need low processing latency only after some backlog data has been processed, such as the one described in the motivation section, neither the stream-only operator nor the batch-only operator can deliver the optimal performance. Therefore, we define stream-batch unified operator as described below:

An operator is a stream-only batch unified operator when it meets the following properties:

  • The operator can extract and handle isBacklog (a boolean value) from metadata received on its inputs. isBacklog==false iff all records received after this metadata need to be processed in real-time. 
  • When all of its inputs has isBacklog=true:
    • The
    While any of its inputs have isBacklog=true:
    • The operator should not emit records. There is no requirement on processing latency during this stageperiod.
    • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting) in this stage.
  • While When all of its inputs have isBacklog=false:
    • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
    • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

  • When some (but not all) inputs have isBacklog=false, the operator can choice either one of the above two execution modes based on its semantics.

By supporting stream-batch unified operator, Flink can deliver optimal performance for the use-case described in the motivation section.

Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.


Code Block
languagejava
/**
 
Code Block
languagejava
/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}

...

2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes .that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    ...

@Nullable    public OperatorAttributesBuilder() {
        private Boolean isOutputOnEOF = falsenull;
    @Nullable private Boolean  isOutputOnCheckpoint = falsenull; 
    @Nullable private Boolean  hasInternalSorter = falsenull;
    }

    public  public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpointsetIsOutputOnEOF(boolean isOutputOnCheckpointisOutputOnEOF) {...}

    public OperatorAttributesBuilder setHasInternalSortersetIsOutputOnCheckpoint(boolean hasInternalSorterisOutputOnCheckpoint) {...}

    public OperatorAttributesOperatorAttributesBuilder buildsetHasInternalSorter(boolean hasInternalSorter) {...}
}
Code Block
languagejava
package org.apache.flink.streaming.api.operators;



    /**
 * OperatorAttributes element provides Job* ManagerIf withany informationoperator thatattribute canis be
null, *we usedwill tolog optimizeit jobat performance.
 */
@PublicEvolving
public class OperatorAttributes { DEBUG level and use the following
    
 * default /**values.
     * Returns- trueisOutputOnEOF iffdefaults theto operatorfalse
 can only emit records after* inputs- haveisOutputOnCheckpoint reacheddefaults EOF.
     *to false
     * <p>Here- arehasInternalSorter thedefaults implications when it is true:to false
     */ 
     public OperatorAttributes build() {...}
}


Code Block
languagejava
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
* <ul>
     *   <li>Its output edges are blocking.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records after wheninputs checkpointhave isreached triggeredEOF.
     *
     * <p>If<p>Here true,are the jobimplications shouldwhen triggerit checkpointis intrue:
 order to flush data to sinks.*
     */ <ul>
     public* boolean getIsOutputOnCheckpoint() {...}

    /** <li>Its output edges are blocking.
     * Returns true iff<li>It thedoes operatornot sortssupport data internallycheckpoint.
       *
 </ul>
     */
 <p>Here are the implicationspublic whenboolean it is true:getIsOutputOnEOF() {...}

     /**
     * <ul>
Returns true iff the operator *can only emit <li>Itsrecords inputwhen recordscheckpoint dois nottriggered.
 need to be sorted externally.*
     * <p>If true, <li>Itsthe managedjob memoryshould shouldtrigger becheckpoint setin accordingorder to execution.sorted-inputs.memory.
     * </ul>flush data to sinks.
     */
    public boolean getHasInternalSortergetIsOutputOnCheckpoint() {...}
}

Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion.

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {

    /**
     * Returns true iff the operator sorts data internally.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     *   <li>It does not support checkpoint when any input has isBacklog=true.
     * </ul>
     */
    public boolean getIsOutputOnEOFgetHasInternalSorter() {
        return false;
    }

    public boolean getHasInternalSorter() {
        return false;
    }
}

2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

3) Update the the CheckpointCoordinator

If any ExecutionVertex is reading from or writing to a blocking edge, then the checkpoint is disabled during this period.

If any operator reports getHasInternalSorter == true, then the checkpoint is disabled when this operator is running from a source with isBacklog=true.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

Benchmark Results

1) DataStream#CoGroup

We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.

Example Usages

...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean getIsOutputOnEOF() {
        return false;
    }

    public boolean getIsOutputOnCheckpoint() {
        return false;
    }

    public boolean getHasInternalSorter() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


3) Update JM to make use the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsOutputOnEOF == true:
    • Its output edge is blocking
    • The PipelinedRegion containing this Transformation should never trigger checkpoint.
  • If all Transformation has IsOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.
  • If a Transformation has HasInternalSorter == true:
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory
    • The PipelinedRegion containing this Transformation should not trigger checkpoint until all inputs have isBacklog=false.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

5) Update DataStream#coGroup to use the same optimization as the DataSet#coGroup when EndOfStreamWindows is used.

The following changes will be made:

  • Add a subclass of TwoInputStreamOperator that uses an internal sorter to do co-group with pretty much the same implementation as to DataSet#coGroup.
  • This operator should override getOperatorAttributes() to return OperatorAttributes(IsOutputOnEOF=true, HasInternalSorter=true).
  • Update CoGroupedStreams#WithWindow#apply to use this operator to do co-group when the user-specified window assigner is EndOfStreamWindows.

Example Usages

1) User wants to co-group records from two bounded streams and emit output after both inputs have ended.

We assume the user-defined function just do "collector.collect(1)" so that overhead of the user-defined function is minimized. The DataStream program looks like this.

Code Block
languagejava
data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction()
Code Block
languagejava
data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);addSink(...);


We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This shows with the changes proposed above, DataStream#coGroup can be 20X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode.


Compatibility, Deprecation, and Migration Plan

...