Discussion thread | - |
---|---|
Vote thread | - |
JIRA | - |
Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Let's consider a use-case that requires a job graph shown in the figure below. pipelinedRegionA will aggregate records from the two bounded inputs using aggregate/join/cogroup operations and emit records only after all its bounded inputs have ended. And operatorB can start to process records from the input_3 only after having received inputs from pipelinedRegionA, and will need to run continuously since input_3 is unbounded.
Currently, supporting such use-case requires the use of the STREAMING execution mode, where all operators must be deployed at the start of the job, and operators cannot apply typical "batch-mode" optimizations like sort join. This approach has a couple drawbacks: 1) operatorB has to be deployed while not being able to do real work; 2) operatorB might waste a lot of memory and disk IO on buffering the records from the unbounded input until it receives records from the pipelinedRegionA; and 3) the throughput of aggregation operations (e.g. co-group) can be 10X worse in stream mode than in batch mode.=
In this FLIP, we propose adding public APIs to support running a mixture of batch-mode and stream-mode operators in the same job, with blocking edges added between these operators. We hope to enable Flink to deliver optimal performance for use-cases that require both stream and batch mode operations, and further enhance Flink as a stream-batch unified processing engine.
Public Interfaces
1) Add EndOfStreamWindows which is a subclass of WindowAssigner.
/** * This WindowAssigner assigns all elements to the same window that is fired iff the input * streams reach EOF. */ @PublicEvolving public class EndOfStreamWindows extends WindowAssigner<Object, TimeWindow> { private static final TimeWindow TIME_WINDOW_INSTANCE = new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); private EndOfStreamWindows() {} public static EndOfStreamWindows get() { return INSTANCE; } @Override public Collection<TimeWindow> assignWindows( Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(TIME_WINDOW_INSTANCE); } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new EndOfStreamTrigger(); } @Override public boolean isEventTime() { return true; } private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> { @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } ... } }
2) Add OperatorAttributesBuilder and OperatorAttributes for developers to specify operator attributes.
package org.apache.flink.streaming.api.operators; /** The builder class for {@link OperatorAttributes}. */ @PublicEvolving public class OperatorAttributesBuilder { ... public OperatorAttributesBuilder() { isOutputOnEOF = false; isOutputOnCheckpoint = false; hasInternalSorter = false; } public OperatorAttributesBuilder setIsOutputOnEOF(boolean isOutputOnEOF) {...} public OperatorAttributesBuilder setIsOutputOnCheckpoint(boolean isOutputOnCheckpoint) {...} public OperatorAttributesBuilder setHasInternalSorter(boolean hasInternalSorter) {...} public OperatorAttributes build() {...} }
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize job performance. */ @PublicEvolving public class OperatorAttributes { /** * Returns true iff the operator can only emit records after inputs have reached EOF. * * <p>Here are the implications when it is true: * * <ul> * <li>Its output edges are blocking. * </ul> */ public boolean getIsOutputOnEOF() {...} /** * Returns true iff the operator can only emit records when checkpoint is triggered. * * <p>If true, the job should trigger checkpoint in order to flush data to sinks. */ public boolean getIsOutputOnCheckpoint() {...} /** * Returns true iff the operator sorts data internally. * * <p>Here are the implications when it is true: * * <ul> * <li>Its checkpoint is disabled. * <li>Its input records do not need to be sorted externally. * <li>Its managed memory should be set according to execution.sorted-inputs.memory. * </ul> */ public boolean getHasInternalSorter() {...} }
3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } @PublicEvolving public interface StreamOperatorFactory<OUT> extends Serializable { ... default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } }
Proposed Changes
1) Add the getIsOutputOnEOF() and getHasInternalSorter() to the Transformation interface.
@Internal public abstract class Transformation<T> { public boolean getIsOutputOnEOF() { return false; } public boolean getHasInternalSorter() { return false; } }
2) Update Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.
Benchmark Results
1) DataStream#CoGroup
We run the CoGroupDataStream benchmark on a mackbook with the latest Flink 1.17-snapshot and parallelism=1. RocksDB is used in the streaming mode.
Here are the benchmark results:
- Without the proposed change, in stream mode, with each of sources generating 2*10^6 records, average execution time is 56 sec.
- Without the proposed change, in batch mode, with each of sources generating 5*10^7 records, average execution time is 118 sec.
- With the proposed change, in both the stream and batch mode, with each of sources generating 5*10^7 records, average execution time is 38 sec.
This show that the same DataStream program in stream mode can be more than 20X faster with proposed change.
Example Usages
data1.coGroup(data2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(EndOfStreamWindows.get()) .apply(new CustomCoGroupFunction()) .addSink(...);
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible.