Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this FLIP, we propose to optimize task deployment and resource utilization for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator will be blocking which effectively let Flink schedule and execute this operator in batch mode.

...

Code Block
languagejava
/**
 * A {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
 *
 * <p>Use this if you want to use a {@link Trigger} and {@link
 * org.apache.flink.streaming.api.windowing.evictors.Evictor} to do flexible, policy based windows.
 */
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	···    
	private final Trigger<Object, GlobalWindow> defaultTrigger;

    private GlobalWindows(Trigger<Object, GlobalWindow> defaultTrigger) {
        this.defaultTrigger = defaultTrigger;
    }
   ···
   @Override
    public Trigger<Object, GlobalWindow> getDefaultTrigger() {
        return defaultTrigger;
    }
   ··· 
   public static GlobalWindows create() {
        return new GlobalWindows(new NeverTrigger());
    }

    public static GlobalWindows createWithEndOfStreamTrigger() {
        return new GlobalWindows(new EndOfStreamTrigger());
    }
   ··· 
  /** A trigger that fires when input ends, as default Trigger for GlobalWindows. */
    @Internal
    public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;

        @Override
        public TriggerResult onElement(
                Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

        @Override
        public void onMerge(GlobalWindow window, OnMergeContext ctx) {}
    }
	···
}

...

It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput. In On another hand, setting the upstream operators of EOF operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.

...