Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this FLIP, we propose to optimize task deployment and resource utilization for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator, as well as the results emitted by its upstream operators, will all be blocking,   which effectively let Flink schedule and execute this operator as well as its upstream operators in batch mode. Hybrid shuffle mode(FLIP-235: Hybrid Shuffle Mode) can be used in batch mode part to further improve the performance when there are sufficient slot resources.

In addition, this FLIP also adds EndOfStreamTrigger in GlobalWindows. GlobalWindows with EndOfStreamTrigger can be used with the DataStream API to specify whether the window will end only at the end of its inputs. DataStream program (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.

...

Code Block
languagejava
/**
 * A {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
 *
 * <p>Use this if you want to use a {@link Trigger} and {@link
 * org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based windows.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	···    
 	private booleanfinal isUsingEndOfStreamTriggerTrigger<Object, =GlobalWindow> falsedefaultTrigger;
 	··· 
    @Override
    public private GlobalWindows(Trigger<Object, GlobalWindow> getDefaultTrigger(defaultTrigger) {
        if (isUsingEndOfStreamTrigger) {
this.defaultTrigger = defaultTrigger;
    }
   ···
   @Override
    public returnTrigger<Object, newGlobalWindow> EndOfStreamTriggergetDefaultTrigger();
        } else {
            return new NeverTrigger()defaultTrigger;
        }
    }
  	··· 
    public public static GlobalWindows createWithEndOfStreamTriggercreate() {
        GlobalWindowsreturn globalWindowsnew = GlobalWindows(new GlobalWindowsNeverTrigger());
    }

    globalWindows.isUsingEndOfStreamTrigger = true;public static GlobalWindows createWithEndOfStreamTrigger() {
        return globalWindowsnew GlobalWindows(new EndOfStreamTrigger());
    }
   	··· 
  /** A trigger that fires when input ends, as default Trigger for GlobalWindows. */
    @Internal
    public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(
                Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

        @Override
        public void onMerge(GlobalWindow window, OnMergeContext ctx) {}
    }
	···
}

...

Code Block
languagejava
package org.apache.flink.streaming.api.operators;
 
/**
 * OperatorAttributes element provides Job Manager with information that can be
 * used to optimize the job performance.
 */
@Experimental
public class OperatorAttributes {    
   /**
     * Returns true iff the operator can only emit records after inputs have reached EOF.
     *
     * <p>Here are the implications when it is true:
     *
     * <ul>
     *   <li> The results of this operator as well as its upstreamchained operators have blocking partition type.
     *   <li> This operator as well as its upstreamchained operators will be executed in batch mode.
     * </ul>
     */
    public boolean isOutputOnEOF() {...}
}

...

  • The process of operator chain can still be done. After that, the results of its operator chain as well as and its upstream operators will be set blocking (by default) or hybrid shuffle partition type which can be controlled by configuring ExecutionOptions.BATCH_SHUFFLE_MODEwill be set blocking.
  • This operator as well as its upstream operators will be executed in batch mode.

    More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover. 

...

The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.

The result shows that DataStream#coGroup in optimized streaming mode can be 22X as fast as streaming mode and 3X as fast as batch mode. Hybrid shuffle can further improve throughput by 11%. 

STREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
66 ± 1 (100%, 1202426 ms)491 ± 5 (743%, 162731 ms)1506 ± 10 (2281%, 53098 ms)1677 ± 42 (2540%, 47687 ms)


2) Execute DataStream#Aggregate

...

The following result shows the throughput (records/sec) when the benchmark is executed in streaming mode, batch mode, optimized streaming mode after this PR, and optimized streaming mode with hybrid shuffle after this PR.

The result shows that DataStream#aggregate in optimized streaming mode can be 10X as fast as streaming mode and 11% faster than batch mode. Hybrid shuffle can further improve throughput by 15%.than batch mode

STREAMINGBATCHOptimized STREAMINGWith hybrid shuffle
163 ± 0 (100%, 490478 ms)1561 ± 16 (957%, 51237 ms)1733 ± 9 (1063%, 46143 ms)1992 ± 15 (1222%, 40148 ms)


3) Execute a program that needs to fully process data from a bounded source before processing data from another unbounded source.

...

It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput. In another hand, setting the upstream operators of EOF operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.