Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-https://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5
Vote thread-https://lists.apache.org/thread/oy9mdmh6gk8pc0wjdk5kg8dz3jllz9ow
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32594

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

[This FLIP proposal is a joint work of Ran JinhaoDong Lin and Xuannan Su ]


Table of Contents

Motivation

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorB needs to We will call such operator full-dam operator. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot and memory resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.

In this FLIP, we propose to optimize performance task deployment and resource utilization for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator , as well as the results emitted by its upstream operators, will all be blocking, which effectively let Flink schedule and execute this operator as well as its upstream operators in batch mode. Hybrid shuffle mode(FLIP-235: Hybrid Shuffle Mode) can be used in batch mode part to further improve the performance.

Image Removed

Public Interfaces

In addition, this FLIP also allows creating a GlobalWindows that the window will be triggered only at the end of its inputs. DataStream programs 1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.


Image Added

Objectives

The APIs provided in this FLIP achieve the following objectives / benefits:


1) Reduce resource usage.  For use-case that involves an operatorB (with unbounded input) depending on the output of another operatorA, where operatorA only emits results at the end of its input, Flink will deploy operatorB after operatorA is finished. This approach reduces unnecessary resource usage when operatorA is still processing its inputs.


2) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows to create a GlobalWindow that is only triggered at the end of its inputs, instead of writing tens of lines of code to define a WindowAssigner subclass. 


3) Introduce attributes to the operator to let Flink know if the operator only outputs results when the input has ended. All the operators defined in Flink should set the attributes accordingly to achieve the benefit above. User-defined operators should be able to set the attributes accordingly so that they can achieve the benefit.


Public Interfaces

1) Add the createWithEndOfStreamTrigger method in GlobalWindows which allows users of the DataStream API to create GlobalWindows with a window trigger that only fires when the input ends.

Code Block
languagejava
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
   ···                   

    /**
     * Creates a {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
     * The window is only useful if you also specify a custom trigger. Otherwise, the window will
     * never be triggered and no computation will be performed.
     */
	public static GlobalWindows create()
Code Block
languagejava
/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * stream reaches EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {
 
    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);
 
    private EndOfStreamWindows() {}
 
    public static EndOfStreamWindows get() {
        return INSTANCE;
    }
 
    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }
 
    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }
 
    @Override
    public boolean isEventTime() {
        return true;
    }
 
    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override...
    }     public TriggerResult onElement(

    /**
     * Creates a {@link WindowAssigner} that assigns Objectall element,elements longto timestamp,the TimeWindowsame window, TriggerContext ctx)
{@link GlobalWindow}
     * and the window is triggered if and only if the input throwsstream Exceptionis {ended.
     */
	public static GlobalWindows createWithEndOfStreamTrigger() {
   return TriggerResult.CONTINUE;
    ...
    }
 
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;}


2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
 
    public OperatorAttributesBuilder() {...}        

    /**
     * Set to }
true 
if and only if the operator only emits @Override
records after all its inputs have ended.
  public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {  * If it is not set, the default value false is used.
     */   
     public return TriggerResult.CONTINUE;OperatorAttributesBuilder setOutputOnlyAfterEndOfStream(
        }
    boolean outputOnlyAfterEndOfStream)   {...
}      }
}

2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
       

	/** TheIf builderany classoperator forattribute is not set explicitly, we will log it at DEBUG level. */
	public OperatorAttributes build() {...}

}


Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@Experimental
public class OperatorAttributes {{@link OperatorAttributes}. */
@PublicEvolving
public class OperatorAttributesBuilder {
    @Nullable private Boolean outputOnEOF = null;
    @Nullable private Boolean outputOnCheckpoint = null;
 
    public OperatorAttributesBuilder() {...}
 
    public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}
 
    public OperatorAttributesBuilder setOutputOnCheckpoint(boolean outputOnCheckpoint) {...}
  
    /**
     * IfReturns anytrue operatorif attributeand isonly null,if wethe willoperator logonly itemits atrecords DEBUGafter levelall andits useinputs thehave followingended.
       * default values.
     * -<p>Here outputOnEOFare defaultsthe toimplications false
when it    * - outputOnCheckpoint defaults to falseis true:
     *
     */ <ul>
     public* OperatorAttributes build() {...}
}
Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@PublicEvolving
public class OperatorAttributes {    
   /** <li> The results of this operator as well as its chained operators have blocking partition type.
     *   <li> This operator as well as its chained operators will be executed in batch mode.
     * Returns</ul>
 true iff the operator can*/ only  emit  records
	public afterboolean inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:isOutputOnlyAfterEndOfStream() {...}
}


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
     *...
	
     /** <ul>
     * Get the <li>{@link TheOperatorAttributes} resultsof of thisthe operator asthat wellthe asFlink itsruntime upstreamcan operatorsuse have blocking partition type.to optimize
     * the job <li>performance.
  This operator as well*
 as its upstream operators will* be@return executedOperatorAttributes inof batchthe modeoperator.
     * </ul>
     */
    public boolean isOutputOnEOF() {...}
 
    /**
     * Returns true iff the operator can only emit records when checkpoint is triggered.
     */
    public boolean isOutputOnCheckpoint() {...}
}

3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean isOutputOnEOF() {
        return false;
    }
 
    public boolean isOutputOnCheckpoint() {
        return false;
    }
}

2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

  • If a Transformation has isOutputOnEOF == true:
    • The results of this operator as well as its upstream operators have blocking partition type.
    • This operator as well as its upstream operators will be executed in batch mode (e.g checkpoint is disabled when these operators are running).
  • If all Transformation has isOutputOnCheckpoint == false:
    • In FLIP-325, JM will not trigger an extra flush() before triggering a checkpoint.

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

5) When DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true

This operator will use the follow optimization to achieve much higher throughput than the existing DataStream#coGroup in both stream and batch mode:

  • It will instantiate two internal sorter to sorts records from its two inputs separately. Then it can pull the sorted records from these two sorters. This can be done without wrapping input records with TaggedUnion<...>. In comparison, the existing DataStream#coGroup needs to wrap input records with TaggedUnion<...> before sorting them using one external sorter, which introduces higher overhead.
  • It will not invoke WindowAssigner#assignWindows or triggerContext#onElement for input records. In comparison, the existing WindowOperator#processElement invokes these methods for every records.

6) When DataStream#aggregate is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.

7) Before all of non-pipeline tasks finished, CheckpointCoordinator would get a null checkpoint plan, and it can work normally after all of non-pipeline tasks finish.

8) Hybrid shuffle can be applied by configuring ExecutionOptions.BATCH_SHUFFLE_MODE to determine the result partition type of isOutputOnEOF operator and its upstream operators in RuntimeExecutionMode.STREAMING.

Benchmark results

Environment Configuration

We run benchmarks on a mackbook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:

Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m
taskmanager.numberOfTaskSlots: 2

Benchmark: OperatorDelayedDeploy

We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(EndOfStreamWindows.get())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(new DiscardingSink<>()); 

Resource Utilization

When we set the Source1Process1, Source2 and Process2 in 4 different SlotSharingGroups as above and then run this job, we can see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED state because only 2 slots provided in TaskManager. With our work, Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

OperatorDelayedDeployDataStream

Efficiency Improvement

Without setting the slot sharing group, we run OperatorDelayedDeployDataStreamInOneSlot each benchmark 5 times, and average throughput(record / ms), throughput range and the magnification to the minimum throughput are provided. Here are the benchmark results:

Data CountKey CountCurrent ThroughputOptimized ThroughputHybrid Shuffle Throughput
5e65e4205 ± 4 (149%)1201 ± 59 (876%)1381 ± 14 (1008%)
5e65e5145 ± 21 (105%)1093 ± 61 (797%)1282 ± 41 (935%)
1e71e5186 ± 19 (135%)1270 ± 64 (927%)1423 ± 41 (1038%)
1e71e6137 ± 10 (100%)1176 ± 31 (858%)1313 ± 33 (958%)
5e75e5OOM1463 ± 7 (1067%)1666 ± 35 (1216%)
5e75e6OOM1099 ± 28 (802%)1222 ± 49 (891%)

This shows that, with the changes proposed above, the total processing time can be about 8~10X faster than its current processing time in this scene. As the records number grows, job in current version causes OOM exception while in optimized version it works well. With hybrid shuffle applied in the batch execution part, the blocking edge from Source1 to Process1 changes to hybrid shuffle edge to improve the performance.

Benchmark: CoGroup

Use DataStream#coGroup to process records from two bounded streams with Data number of records and emit results after both inputs have ended. With this PR, when DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, EOFAggregationOperator will be applied to sort two inputs first, and then output the coGroup results.

Code Block
languagejava
data1.coGroup(data2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(EndOfStreamWindows.get())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

We run each benchmark in CoGroupDataStream 5 times, here are the throughput benchmark results:

Data CountKey CountCurrentBATCH OptimizedHybrid Shuffle
2e62e661 ± 1 (115%)502 ± 75 (947%)868 ± 74 (1637%)899 ± 82 (1696%)
5e65e660 ± 1 (113%)528 ± 28 (996%)1176 ± 39 (2218%)1262 ± 50 (2381%)
2e72e756 ± 0 (105%)564 ± 34 (1064%)1492 ± 28 (2815%)1692 ± 14 (3192%)
5e75e753 ± 1 (100%)602 ± 16 (1135%)1508 ± 18 (2845%)1712 ± 77 (3230%)

This shows with the changes proposed above, DataStream#coGroup can be 15~30X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode. Using hybrid shuffle in the optimized outputEOF part can further improve the performance.

Benchmark: Aggregate

Use DataStream#aggregate to process records from a bounded streams with Data number of records evenly distributed in Key Count number of keys, and emit results after input have ended.

Code Block
languagejava
data
      .keyBy(value -> value.f0)
      .window(EndOfStreamWindows.get())
      .aggregate(new Aggregator())
      .addSink(new CountingAndDiscardingSink());

We run benchmark in 10 records per key and 100 records per key in AggregationBenchmark, and each for 5 times, here are the throughput benchmark results:

10 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e5164 ± 1 (145%)1277 ± 129 (1130%)1365 ± 79 (1207%)1418 ± 154 (1254%)
1e71e6151 ± 2 (133%)1403 ± 87 (1241%)1511 ± 43 (1337%)1697 ± 175 (1501%)
2e72e6129 ± 4 (114%)1322 ± 21 (1169%)1411 ± 17 (1248%)1598 ± 13 (1414%)
5e75e6113 ± 1 (100%)1282 ± 13 (1134%)1155 ± 13 (1022%)1256 ± 22 (1111%)

100 records per key

Data CountKey CountCurrentBATCHOptimizedHybrid Shuffle
5e65e4201 ± 3 (114%)1454 ± 153 (826%)1514 ± 43 (860%)1670 ± 29 (948%)
1e71e5207 ± 2 (117%)1522 ± 71 (864%)1620 ± 44 (920%)1866 ± 187 (1060%)
2e72e5201 ± 3 (114%)1607 ± 9 (913%)1746 ± 31 (992%)2038 ± 155 (1157%)
5e75e5176 ± 1 (100%)1559 ± 6 (885%)1723 ± 22 (978%)1956 ± 56 (1111%)

This shows with the changes proposed above, DataStream#aggregate can be 10X faster than its current throughput in stream mode, and slightly faster to the current throughput in batch mode. Using hybrid shuffle in the optimized outputEOF part can further improve the performance.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

Future Work

...

/
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 

public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    /**
     * Get the {@link OperatorAttributes} of the operator created by the factory. Flink runtime can
     * use it to optimize the job performance.
     *
     * @return OperatorAttributes of the operator created by the factory.
     */
    @Experimental
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on PhysicalTransformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal 
public abstract class PhysicalTransformation<T> extends Transformation<T> {
    public boolean isOutputOnlyAfterEndOfStream() {
        return false;
    }
}


2) Update the following Transformation subclasses to override the newly added methods using the OperatorAttributes obtained from the underlying Operator:

  • OneInputTransformation
  • TwoInputTransformation
  • AbstractMultipleInputTransformation


3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

If a Transformation has isOutputOnlyAfterEndOfStream == true:

  • The process of operator chaining can still be done. After that, the results of its operator chain will be set blocking.
  • This operator will be executed in batch mode.

    More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover. 


4) Enumerate all existing operators defined in Flink and set the isOutputOnlyAfterEndOfStream accordingly for those operators that may only output at the end of the input to enable the optimization mentioned in 3):

  • WindowOperator: When used with GlobalWindows that is created with the method createWithEndOfStreamTrigger as the window assigner
  • StreamSortOperator


5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows, the WindowOperator will check if the window assigner is an instance of GlobalWindow and only triggers at the end of the inputs. If true, the operator will have OperatorAttributes with isOutputOnlyAfterEndOfStream = true to achieve higher throughput by using the optimizations in batch mode.

Analysis of APIs affected by this FLIP

This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.

More specifically, the following DataStream API can benefit from using GlobalWindows which is created with the method createWithEndOfStreamTrigger.

  • DataStream#windowAll
  • KeyedStream#window
  • CoGroupedStreams#Where#EqualTo#window
  • JoinedStreams#Where#EqualTo#window

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.


As mentioned in Proposed Change 4), we will enumerate all the existing operators defined in Flink and set the isOutputOnlyAfterEndOfStream accordingly. For user-defined operators, their behavior stays the same as before. If the user-defined operators only output at the end of input, they must set the isOutputOnlyAfterEndOfStream to true to enable the optimization.

Future Work

  • It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached the end for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.
  • Setting the upstream operators of the full-dam operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
  • Optimizing the implementation of frequently used full-dam operators,  such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. For example, we can instantiate an internal sorter in the operator, and it will not have to invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record.