You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorA needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.

In this FLIP, we propose to optimize performance for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator, as well as the results emitted by its upstream operators, will all be blocking, which effectively let Flink schedule and execute this operator as well as its upstream operators in batch mode. Hybrid shuffle mode(FLIP-235: Hybrid Shuffle Mode) can be used in batch mode part to further improve the performance.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * stream reaches EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
 
    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
 
    private EndOfStreamWindows() {}
 
    public static EndOfStreamWindows get() {
        return INSTANCE;
    }
 
    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }
 
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }
 
    @Override
    public boolean isEventTime() {
        return true;
    }
 
    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }
 
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }
 
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean outputOnEOF = null;
    @Nullable private Boolean outputOnCheckpoint = null;
 
    public OperatorAttributesBuilder() {...}
 
    public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}
 
    public OperatorAttributesBuilder setOutputOnCheckpoint(boolean outputOnCheckpoint) {...}
  
    /**
     * If any operator attribute is null, we will log it at DEBUG level and use the following
     * default values.
     * - outputOnEOF defaults to false
     * - outputOnCheckpoint defaults to false
     */
     public OperatorAttributes build() {...}
}
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {    
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li> The results of this operator as well as its upstream operators have blocking partition type.
     *   <li> This operator as well as its upstream operators will be executed in batch mode.
     * </ul>
     */
    public boolean isOutputOnEOF() {...}
 
    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     */
    public boolean isOutputOnCheckpoint() {...}
}


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

@Internal
public abstract class Transformation<T> {
    public boolean isOutputOnEOF() {
        return false;
    }
 
    public boolean isOutputOnCheckpoint() {
        return false;
    }
}


2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.


3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has isOutputOnEOF == true:
    • The results of this operator as well as its upstream operators have blocking partition type.
    • This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
  • If all Transformation has isOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.


5) When DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true

This operator will use the follow optimization to achieve much higher throughput than the existing DataStream#coGroup in both stream and batch mode:

  • It will instantiate two internal sorter to sorts records from its two inputs separately. Then it can pull the sorted records from these two sorters. This can be done without wrapping input records with TaggedUnion<...>. In comparison, the existing DataStream#coGroup needs to wrap input records with TaggedUnion<...> before sorting them using one external sorter, which introduces higher overhead.
  • It will not invoke WindowAssigner#assignWindows or triggerContext#onElement for input records. In comparison, the existing WindowOperator#processElement invokes these methods for every records.

6) When DataStream#aggregate is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.

7) Before all of non-pipeline tasks finished, CheckpointCoordinator would get a null checkpoint plan, and it can work normally after all of non-pipeline tasks finish.

8) Hybrid shuffle can be applied by configuring ExecutionOptions.BATCH_SHUFFLE_MODE to determine the result partition type of isOutputOnEOF operator and its upstream operators in RuntimeExecutionMode.STREAMING.

Benchmark results

1) Use DataStream#coGroup to process records from two bounded streams and emit results after both inputs have ended.

We use a user-defined function which just invokes "collector.collect(1)" so that overhead of the user-defined function is minimized. The DataStream program looks like this.

data1.coGroup(data2)
     .where(tuple -> tuple.f0)
     .equalTo(tuple -> tuple.f0)
     .window(EndOfStreamWindows.get())
     .apply(new CustomCoGroupFunction())
     .addSink(...);


We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.18-snapshot and parallelism=1. RocksDB is used in the streaming mode.

Here are the benchmark results:

  • Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
  • Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
  • With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records,  average execution time is 46 sec.

This shows with the changes proposed above, DataStream#coGroup can be 20X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode.


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.


Future Work

It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.


  • No labels