Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5
Vote thread-https://lists.apache.org/thread/oy9mdmh6gk8pc0wjdk5kg8dz3jllz9ow
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32594

Release-

...

[This FLIP proposal is a joint work between of Ran Jinhao and Dong Lin  and Xuannan Su ]


Table of Contents

Motivation

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. We will call such operator full-dam operator. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot and memory resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.

In this FLIP, we propose to optimize task deployment and resource utilization for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator will be blocking,   which effectively let Flink schedule and execute this operator in batch mode.

In addition, this FLIP also adds EndOfStreamTrigger in GlobalWindows. GlobalWindows with EndOfStreamTrigger can be used with the DataStream API to specify whether the window will end allows creating a GlobalWindows that the window will be triggered only at the end of its inputs. DataStream program programs (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.


Objectives

The APIs provided in this FLIP achieves achieve the following objectives / benefits:


1) Improve throughputReduce resource usage.  For use-case that involves a mixture of bounded and unbounded workloads (e.g. the use-case specified in the example), the user can start a single Flink job to address such use-case where the performance of the bounded workload can be optimal (e.g. similar/higher than the corresponding performance in batch mode).

2) Reduce resource usage.  For use-case that involves an operatorB (with unbounded input) depending on the output of another operatorA, where operatorA only emits results at the end of its input, Flink will deploy operatorB after operatorA is finished. This approach reduces the unnecessary resource usage when operatorA is still processing its inputs.

3) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows with new default trigger provided in this FLIP, instead of writing tens of lines of code to define this WindowAssigner subclass. 

Public Interfaces

an operatorB (with unbounded input) depending on the output of another operatorA, where operatorA only emits results at the end of its input, Flink will deploy operatorB after operatorA is finished. This approach reduces unnecessary resource usage when operatorA is still processing its inputs.


2) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows to create a GlobalWindow that is only triggered at the end of its inputs, instead of writing tens of lines of code to define a WindowAssigner subclass. 


3) Introduce attributes to the operator to let Flink know if the operator only outputs results when the input has ended. All the operators defined in Flink should set the attributes accordingly to achieve the benefit above. User-defined operators should be able to set the attributes accordingly so that they can achieve the benefit.


Public Interfaces

1) Add the createWithEndOfStreamTrigger method in 1) Add the EndOfStreamTrigger in GlobalWindows which allows users of the DataStream API to create GlobalWindows with EndOfStreamTrigger to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.a window trigger that only fires when the input ends.

Code Block
Code Block
languagejava
/**
 * A {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
 *
 * <p>Use this if you want to use@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
   ···                   

    /**
     * Creates a {@link TriggerWindowAssigner} andthat {@link
assigns * org.apache.flink.streaming.api.windowing.evictors.Evictor}all elements to dothe flexible,same policy based windows{@link GlobalWindow}.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object,* GlobalWindow> {
	···    
	private final Trigger<Object, GlobalWindow> defaultTrigger;

    private GlobalWindows(Trigger<Object, GlobalWindow> defaultTrigger) {The window is only useful if you also specify a custom trigger. Otherwise, the window will
     * never be this.defaultTriggertriggered =and defaultTrigger;
no computation will be }performed.
    ···
   @Override
    */
	public Trigger<Object,static GlobalWindow>GlobalWindows getDefaultTriggercreate() {
        return defaultTrigger;...
    }
   ··· 
   public

 static GlobalWindows create() {
 /**
     * Creates a  return new GlobalWindows(new NeverTrigger());
    }
{@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}
    public static* GlobalWindowsand createWithEndOfStreamTrigger() {
        return new GlobalWindows(new EndOfStreamTrigger());the window is triggered if and only if the input stream is ended.
    }
   ··· 
  /** A trigger that fires when input ends, as default Trigger for GlobalWindows. */ */
	public static GlobalWindows createWithEndOfStreamTrigger() {
        ...
    @Internal
    public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override}

 }


2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
 
    public OperatorAttributesBuilder() {...}        

    /**
     * Set to publictrue TriggerResultif onElement(
and only if the operator only emits records after all its inputs have ended.
   Object element, long* timestamp,If GlobalWindowit window,is TriggerContextnot ctx) {
     set, the default value false is used.
     */   return TriggerResult.CONTINUE;
     public OperatorAttributesBuilder  }
setOutputOnlyAfterEndOfStream(
        @Override
    boolean outputOnlyAfterEndOfStream) {...}   public  TriggerResult  onEventTime(     

	/** If any operator attribute is not set explicitly, we will log it at DEBUG level. */
	public OperatorAttributes build() {...}

}


Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@Experimental
public class OperatorAttributes {
  
    /**
     * Returns true if and only if the operator only emits records after all its inputs have ended.
     *long time, GlobalWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

     * <p>Here are @Override
the implications when it is true:
   public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
*
     * <ul>
     *   @Override
<li> The results of this operator as well publicas its voidchained onMerge(GlobalWindow window, OnMergeContext ctx) {}operators have blocking partition type.
    }
	···
}

2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/** The builder class for {@link OperatorAttributes}. */
@Experimental
public class OperatorAttributesBuilder {
    @Nullable private Boolean outputOnEOF = null;
 
    public OperatorAttributesBuilder *   <li> This operator as well as its chained operators will be executed in batch mode.
     * </ul>
     */     
	public boolean isOutputOnlyAfterEndOfStream() {...}
 
    public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...}
   }


3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
	
    /**
     * IfGet anythe operator{@link attributeOperatorAttributes} isof null,the weoperator willthat logthe itFlink atruntime DEBUG levelcan and use theto followingoptimize
     * the defaultjob valuesperformance.
     * - outputOnEOF defaults to false
     * @return OperatorAttributes of the operator.
     */
     public@Experimental
    default OperatorAttributes buildgetOperatorAttributes() {...}
}
Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@Experimental
public class OperatorAttributes {    
        return new OperatorAttributesBuilder().build();
    }
}
 

public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    /**
     * Returns true iffGet the {@link OperatorAttributes} of the operator cancreated onlyby emitthe recordsfactory. afterFlink inputs have reached EOF.runtime can
     *
 use it to optimize *the <p>Herejob areperformance.
 the implications when it is true:*
     *
     * <ul> @return OperatorAttributes of the operator created by the factory.
     */
    <li>@Experimental
 The results of thisdefault operatorOperatorAttributes as well as its chained operators have blocking partition type.getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
     *   <li> This operator as well as its chained operators will be executed in batch mode.}
}


Proposed Changes

1) Add the APIs on PhysicalTransformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal 
public abstract class PhysicalTransformation<T> extends Transformation<T> {
    public boolean isOutputOnlyAfterEndOfStream() {
     * </ul>
     */
  return false;
  public boolean isOutputOnEOF() {...}
}


32) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@Experimental
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@Experimental
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    public boolean isOutputOnEOF() {
        return false;
    }
}

2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.

3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

If a Transformation has isOutputOnEOF == true:

...

Update the following Transformation subclasses to override the newly added methods using the OperatorAttributes obtained from the underlying Operator:

  • OneInputTransformation
  • TwoInputTransformation
  • AbstractMultipleInputTransformation


3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.

If a Transformation has isOutputOnlyAfterEndOfStream == true:

  • The process of operator chaining can still be done. After that, the results of its operator chain will be set blocking.
  • This operator will be executed in batch mode.

    More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover. 


4) Enumerate all existing operators defined in Flink and set the isOutputOnlyAfterEndOfStream accordingly for those operators that may only output at the end of the input to enable the optimization mentioned in 3):

  • WindowOperator: When used with GlobalWindows that is created with the method createWithEndOfStreamTrigger as the window assigner
  • StreamSortOperator


5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows, the WindowOperator will check if the window assigner is an instance of GlobalWindow and only triggers at the end of the inputs. If true, the operator will have OperatorAttributes with isOutputOnlyAfterEndOfStream = true to achieve higher throughput by using the optimizations in batch mode.

Analysis of APIs affected by this FLIP

This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.

More specifically, the following DataStream API can benefit from using GlobalWindows which is created with the method createWithEndOfStreamTrigger.

  • DataStream#windowAll
  • KeyedStream#window
  • CoGroupedStreams#Where#EqualTo#window
  • JoinedStreams#Where#EqualTo#window

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.


As mentioned in Proposed Change 4), we will enumerate all the existing operators defined in Flink and set the isOutputOnlyAfterEndOfStream accordingly. For user-defined operators, their behavior stays the same as before. If the user-defined operators only output at the end of input, they must set the isOutputOnlyAfterEndOfStream to true to enable the optimization

...

4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.

This is needed in order for this FLIP to work with FLIP-327. More specifically, once both FLIP-327 and FLIP-331 are accepted, we need a way to determine the backlog status for input with blocking edge type.

5) When DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.

The following optimization will be used to achieve higher throughput than the existing DataStream#coGroup in both stream and batch mode:

  • It will instantiate two internal sorter to sorts records from its two inputs separately. Then it can pull the sorted records from these two sorters. This can be done without wrapping input records with TaggedUnion<...>. In comparison, the existing DataStream#coGroup needs to wrap input records with TaggedUnion<...> before sorting them using one external sorter, which introduces higher overhead.
  • It will not invoke WindowAssigner#assignWindows or triggerContext#onElement for input records. In comparison, the existing WindowOperator#processElement invokes these methods for every records.

6) When DataStream#aggregate is invoked with GlobalWindows and its default trigger as the window assigner and trigger, Flink should generate an operator with isOutputOnEOF = true.

In addition, after FLIP-327 is accepted, suppose this API is called without any explicitly-assigned trigger or evictor, the program should also have isInternalSorterSupported = true to achieve higher throughput by using the optimizations currently done in batch mode.

More specifically, This operator will sort the input before aggregation, and avoid invoking window actions, which is similar to '5)'.

Analysis of APIs affected by this FLIP

This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.

More specifically, the following DataStream API can benefit from using GlobalWindows with default trigger.

  • DataStream#windowAll
  • KeyedStream#window
  • CoGroupedStreams#Where#EqualTo#window
  • JoinedStreams#Where#EqualTo#window

Benchmark results

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations. Each result is measured by taking the average execution time across 5 runs with the given configuration.

We run benchmarks on a MacBook with the latest Flink 1.18-snapshot and parallelism=1. Default standalone cluster configuration is used except:

Code Block
languageyml
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m

1) Execute DataStream#CoGroup

This benchmark uses DataStream#coGroup to process records from two bounded inputs. Each input will generate records with (key = i, value = i) for i from 1 to 8*10^7.

Below is the DataStream program code snippet.

Code Block
languagejava
DataStream<Tuple2<Integer, Double>> source1  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));
DataStream<Tuple2<Integer, Double>> source2  = env.fromCollection(
                new DataGenerator(dataNum), Types.TUPLE(Types.INT, Types.DOUBLE));

source1.coGroup(source2)
    .where(tuple -> tuple.f0)
    .equalTo(tuple -> tuple.f0)
    .window(GlobalWindows.createWithEndOfStreamTrigger())
    .apply(new CustomCoGroupFunction())
    .addSink(...);

private static class CustomCoGroupFunction
            extends RichCoGroupFunction<Tuple2<Integer, Double>, Tuple2<Integer, Double>, Integer> {
        @Override
        public void coGroup(
                Iterable<Tuple2<Integer, Double>> iterableA,
                Iterable<Tuple2<Integer, Double>> iterableB,
                Collector<Integer> collector) {
            collector.collect(1);
        }
    }

The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode and optimized streaming mode after this PR.

The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode. 

STREAMINGBATCHOptimized STREAMING
66 ± 1 (100%, 1202426 ms)491 ± 5 (743%, 162731 ms)1506 ± 10 (2281%, 53098 ms)

2) Execute DataStream#Aggregate

This benchmark uses DataStream#aggregate to process 8*10^7 records. These records are evenly distributed across 8*10^5 keys. More specifically, the source will generate records with (key = i, value = i) for i from 1 to 8*10^5, and repeat this process 100 times.

Below is the DataStream program code snippet.

Code Block
languagejava
DataStreamSource<Tuple2<Long, Double>> source  = env.fromCollection(
                        new DataGenerator(dataNum, keyNum), Types.TUPLE(Types.LONG, Types.DOUBLE));
source.keyBy(value -> value.f0)
      .window(GlobalWindows.createWithEndOfStreamTrigger())
      .aggregate(new Aggregator())
      .addSink(...);

public static class Aggregator implements AggregateFunction<Tuple2<Long, Double>, Tuple2<Long, Double>, Double> {
        @Override
        public Tuple2<Long, Double> createAccumulator() {
            return new Tuple2<Long, Double>(0L, 0.0);
        }

        @Override
        public Tuple2<Long, Double> add(Tuple2<Long, Double> myData, Tuple2<Long, Double> accData) {
            accData.f1 = accData.f1 + myData.f1;
            return accData;
        }

        @Override
        public Double getResult(Tuple2<Long, Double> result) {
            return result.f1;
        }

        @Override
        public Tuple2<Long, Double> merge(Tuple2<Long, Double> acc1, Tuple2<Long, Double> acc2) {
            acc1.f1 = acc1.f1 + acc2.f1;
            return acc1;
        }
    }

The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode and optimized streaming mode after this PR.

The result shows that DataStream#aggregate in optimized streaming mode can be 10X as fast as streaming mode and 11% faster than batch mode

STREAMINGBATCHOptimized STREAMING
163 ± 0 (100%, 490478 ms)1561 ± 16 (957%, 51237 ms)1733 ± 9 (1063%, 46143 ms)

3) Execute a program that needs to fully process data from a bounded source before processing data from another unbounded source.

The following program demonstrates the scenario described in the motivation section. The program needs to pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.

Code Block
languagejava
 source1.keyBy(value -> value.f0)
  .window(GlobalWindows.createWithEndOfStreamTrigger())
  .aggregate(new MyAggregator()).name("Process1")
  .connect(source2.keyBy(value -> value.f0))
  .transform("Process2", Types.INT, new MyProcessOperator())
  .addSink(...); 

We can use this program to demonstrate that the program requires less slot resources. More specifically, suppose we configure the standalone cluster with taskmanager.numberOfTaskSlots = 2, and set the Source1,Process1, Source2 and Process2 in 4 different SlotSharingGroups, the program will fail to be deployed before this FLIP. And the program can be deployed successfully after this FLIP. This is because Source2 and Process2 can be deplyed after Source1Process1 finished and released their slots.

Additionally, we can use this program to demonstrate that it can achieve higher performance because Process2 will not need to keep buffer records emitted by Source2 in its memory while Process1 has not reached EOF. More specifically, the program can fail with OOM before this FLIP when the number of records in inputs is high. And the program can finish successfully without OOM after this FLIP.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

Future Work

  • It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached

...

  • the end for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle

...

  • to increase job throughput.
  • Setting the upstream operators of the full-dam operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
  • Optimizing the implementation of frequently used full-dam operators,  such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. For example, we can instantiate an internal sorter in the operator, and it will not have to invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record

...

  • .