Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion thread-
Vote thread-
JIRA

-

Release-

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Let's consider a use-case where we need to aggregate records from a bounded stream using OperatorA in a "batch manner" (i.e., emitting the aggregation result at the end of the input stream), followed by another two-input OperatorB to process both the aggregation result and the records from an unbounded stream in a "stream manner" (i.e., continuously emitting results without waiting for the end of the input streams). Currently, supporting such use-cases requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join.

Although we can obtain the correct result, the performance and cost of this DataStream job may be significantly worse than what can be practically achieved. For instance, we could save deployment costs by scheduling OperatorB only after OperatorA has finished running. And we might be able to significantly reduce the execution time for OperatorA by applying some "batch-mode" optimizations. For example, benchmark results show that the DataStream CoGroup operation in BATCH mode can be more than 10 times faster than in STREAMING mode.

In this FLIP, we propose adding public APIs for users and operator developers to specify whether the corresponding DataStream operations and operators should be executed in batch mode. Furthermore, we will update Flink's job graph generation logic to support running a combination of stream-mode and batch-mode operators within the same job. By enabling Flink to deliver optimal performance for use-cases that require both stream and batch mode operations in the same job, we can further enhance Flink as a stream-batch unified processing engine.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner.

Code Block
languagejava
/**
 * This WindowAssigner assigns all elements to the same window that is fired iff the input
 * streams reach EOF.
 */
@PublicEvolving
public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> {

    private static final TimeWindow TIME_WINDOW_INSTANCE =
            new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE);

    private EndOfStreamWindows() {}

    public static EndOfStreamWindows get() {
        return INSTANCE;
    }

    @Override
    public Collection<TimeWindow> assignWindows(
            Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(TIME_WINDOW_INSTANCE);
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return new EndOfStreamTrigger();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> {
        @Override
        public TriggerResult onElement(
                Object element, long timestamp, TimeWindow window, TriggerContext ctx)
                throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
            return TriggerResult.CONTINUE;
        }
        ...
    }
}


2) Add the isBatchForced() API to the StreamOperator and StreamOperatorFactory interfaces.

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    default boolean isBatchForced() {
        return false;
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    default boolean isBatchForced() {
        return false;
    }
}


Proposed Changes


1) Add the isBatchForced() API to the Transformation interface.

Code Block
languagejava
@Internal
public abstract class Transformation<T> {
    default boolean isBatchForced() {
        return false;
    }
}




Example Usages



Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.