You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Let's consider a use-case where user wants to bootstrap job state (operatorA and operatorB in the figure below) using records from a bounded source. There is no need to emit any records when the job is still processing records from the bounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).

Currently, supporting such use-case requires all operators to be deployed at the start of the job and run in the streaming mode. This approach has several drawbacks: 1) operatorA might be 10X slower in stream mode than when it is run in batch mode; 2) it is a waste of slot resource to deploy operatorB when it can not do more useful work than waiting/buffering results from operatorA; and 3) operatorB might end up spilling a lot of records from the unbounded source to its local disk until it has received all records from operatorA.

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will deploy its downstream operators only after such an operator finishes; and 2) support stream-batch unified operator which can switch from batch mode to stream mode during execution without restart. We hope these capability can further enhance Flink as a stream-batch unified processing engine.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325.



Terminology and Background

Currently, operators supported by Flink must be either stream-only operator or batch-only operator, with the definition specified below.

An operator is a batch-only operator when it meets the following properties:

  • The operator should emit records only after all its inputs have ended. There is no requirement on processing latency.
  • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting).

An operator is a stream-only operator when it meets the following properties:

  • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
  • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.

As a result of this limitation, if any input of the computation is unbounded, the operator must be stream-only, and thus can not take advantage of optimizations such as buffering/sorting. This is OK when all input records need to be processed in real-time with low processing latency. But it can easily lead to suboptimal performance when the inputs are composed of backlog records followed by the real-time records (e.g. the use-case described in the motivation section).


In order to address this issue, this FLIP proposes to support stream-batch unified operator, with the definition specified below:

An operator is a stream-batch unified operator when it meets the following properties:

  • The operator can handle isBacklog (a boolean value) metadata received on its inputs. isBacklog==false iff all records received after this metadata need to be processed in real-time. 
  • When all of its inputs has isBacklog=true:
    • The operator should not emit records. There is no requirement on processing latency during this period.
    • The operator does not need to support checkpointing. Thus operator can use arbitrary optimization (e.g. sorting) in this stage.
  • When all of its inputs have isBacklog=false:
    • The operator should emit records continuously while it is still processing input records. Processing latency needs to be low.
    • The operator should support checkpointing. Thus operator should only use optimizations compatible with checkpoint.
  • When some (but not all) inputs have isBacklog=false, the operator can choice either one of the above two execution modes based on its semantics.

By supporting stream-batch unified operator, Flink can deliver optimal performance for the use-case described in the motivation section.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.


/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

package org.apache.flink.streaming.api.operators;

/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean isOutputOnEOF = null;
    @Nullable private Boolean isOutputOnCheckpoint = null; 
    @Nullable private Boolean hasInternalSorter = null; 

    public OperatorAttributesBuilder() {...}

    public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...}

    public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...}

    public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...}

    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - isOutputOnEOF defaults to false
     * - isOutputOnCheckpoint defaults to false
     * - hasInternalSorter defaults to false
     */ 
     public OperatorAttributes build() {...}
}
package org.apache.flink.streaming.api.operators;

/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {     
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its output edges are blocking.
     *   <li>It does not support checkpoint.
     * </ul>
     */
    public boolean getIsOutputOnEOF() {...}

    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     *
     * <p>If true, the job should trigger checkpoint in order to flush data to sinks.
     */
    public boolean getIsOutputOnCheckpoint() {...}

    /**
     * Returns true iff the operator sorts data internally.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li>Its input records do not need to be sorted externally.
     *   <li>Its managed memory should be set according to execution.sorted-inputs.memory.
     *   <li>It does not support checkpoint when any input has isBacklog=true.
     * </ul>
     */
    public boolean getHasInternalSorter() {...}
}


Note that an operator with internal sorter does not necessarily mean that it only emits data at the end of input.  For example, we might have an operator that sorted data when it is still reading from an input with isBacklog=true. When all the inputs (it is still reading from) have isBacklog=false, the operator can stop sorting and start to emit records continuously in the streaming fashion. And the operator can support checkpointing during this period.


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

@Internal
public abstract class Transformation<T> {
    public boolean getIsOutputOnEOF() {
        return false;
    }

    public boolean getIsOutputOnCheckpoint() {
        return false;
    }

    public boolean getHasInternalSorter() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


3) Update JM to make use the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has IsOutputOnEOF == true:
    • Its output edge is blocking
    • The PipelinedRegion containing this Transformation should never trigger checkpoint.
  • If all Transformation has IsOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.
  • If a Transformation has HasInternalSorter == true:
    • Flink runtime will not add external sorter for its inputs (including during batch-mode with keyed inputs)
    • Its managed memory should be set according to execution.sorted-inputs.memory
    • The PipelinedRegion containing this Transformation should not trigger checkpoint until all inputs have isBacklog=false.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

5) Update DataStream#coGroup to use the same optimization as the DataSet#coGroup when EndOfStreamWindows is used.

The following changes will be made:

  • Add a subclass of TwoInputStreamOperator that uses an internal sorter to do co-group with pretty much the same implementation as to DataSet#coGroup.
  • This operator should override getOperatorAttributes() to return OperatorAttributes(IsOutputOnEOF=true, HasInternalSorter=true).
  • Update CoGroupedStreams#WithWindow#apply to use this operator to do co-group when the user-specified window assigner is EndOfStreamWindows.

6) For those stream-only operators whose throughput is observably slower than the corresponding batch-only operator, update stream-only operator to be stream-batch unified operator.

Typically, those operators that have keyed inputs and involve aggregation operator (e.g. join, cogroup, aggregate) can benefit from this optimization.

Though the concrete changes may be vary across operators, typically an operator might need the following changes to be stream-batch unified:

  • Override getOperatorAttributes to return HasInternalSorter = true if the operator needs to use internal sorter in the batch mode.
  • Override the corresponding processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status. For example, the operator can buffer and sort input records when any input has isBacklog=true. And once all inputs' status has switch to isBacklog=false, then process the buffered records, emit results, and start to work in the stream execution mode.


Example Usages

1) User wants to co-group records from two bounded streams and emit output after both inputs have ended.

We assume the user-defined function just do "collector.collect(1)" so that overhead of the user-defined function is minimized. The DataStream program looks like this.

data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);


We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This shows with the changes proposed above, DataStream#coGroup can be 20X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode.


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.




  • No labels