Discussion thread | - |
---|---|
Vote thread | - |
JIRA | - |
Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Let's consider a use-case where we need to aggregate records from a bounded stream using OperatorA in a "batch manner" (i.e., emitting the aggregation result at the end of the input stream), followed by another two-input OperatorB to process both the aggregation result and the records from an unbounded stream in a "stream manner" (i.e., continuously emitting results without waiting for the end of the input streams). Currently, supporting such use-cases requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join.
Although we can obtain the correct result, the performance and cost of this DataStream job may be significantly worse than what can be practically achieved. For instance, we could save deployment costs by scheduling OperatorB only after OperatorA has finished running. And we might be able to significantly reduce the execution time for OperatorA by applying some "batch-mode" optimizations. For example, benchmark results show that the DataStream CoGroup operation in BATCH mode can be more than 10 times faster than in STREAMING mode.
In this FLIP, we propose adding public APIs for users and operator developers to specify whether the corresponding DataStream operations and operators should be executed in batch mode. Furthermore, we will update Flink's job graph generation logic to support running a combination of stream-mode and batch-mode operators within the same job. By enabling Flink to deliver optimal performance for use-cases that require both stream and batch mode operations in the same job, we can further enhance Flink as a stream-batch unified processing engine.
Public Interfaces
1) Add EndOfStreamWindows which is a subclass of WindowAssigner.
/** * This WindowAssigner assigns all elements to the same window that is fired iff the input * streams reach EOF. */ @PublicEvolving public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> { private static final TimeWindow TIME_WINDOW_INSTANCE = new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); private EndOfStreamWindows() {} public static EndOfStreamWindows get() { return INSTANCE; } @Override public Collection<TimeWindow> assignWindows( Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(TIME_WINDOW_INSTANCE); } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new EndOfStreamTrigger(); } @Override public boolean isEventTime() { return true; } private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> { @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } ... } }
2) Add the isBatchForced() API to the StreamOperator and StreamOperatorFactory interfaces.
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { default boolean isBatchForced() { return false; } } @PublicEvolving public interface StreamOperatorFactory<OUT> extends Serializable { default boolean isBatchForced() { return false; } }
Proposed Changes
1) Add the isBatchForced() API to the Transformation interface.
@Internal public abstract class Transformation<T> { default boolean isBatchForced() { return false; } }
Example Usages
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible.