Discussion thread | -https://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5 | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | -https://lists.apache.org/thread/oy9mdmh6gk8pc0wjdk5kg8dz3jllz9ow | ||||||||
JIRA |
| ||||||||
Release | - |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
[This FLIP proposal is a joint work of Ran Jinhao, Dong Lin and Xuannan Su ]
Table of Contents |
---|
As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. We will call such operator full-dam operator. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.
Currently, supporting the above use-case requires all operators to be deployed at the start of the job. This approach wastes slot and memory resources because operatorB can not do any useful work until operatorA's input has ended. Even worse, operatorB might use a lot of disk space only to cache and spill records received from the unbounded source to its local disk while it is waiting for operatorA's output.
In this FLIP, we propose to optimize performance task deployment and resource utilization for the above use-case by allowing an operator to explicitly specify whether it only emits records after all its inputs have ended. JM will leverage this information to optimize job scheduling such that the partition type of the results emitted by this operator , as well as the results emitted by its upstream operators, will all be blocking, which effectively let Flink schedule and execute this operator as well as its upstream operators in batch mode. Hybrid shuffle mode(FLIP-235: Hybrid Shuffle Mode) can be used in batch mode part to further improve the performance when there are sufficient slot resources.
Public Interfaces
In addition, this FLIP also allows creating a GlobalWindows that the window will be triggered only at the end of its inputs. DataStream programs (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.
Objectives
The APIs provided in this FLIP achieve the following objectives / benefits:
1) Reduce resource usage. For use-case that involves an operatorB (with unbounded input) depending on the output of another operatorA, where operatorA only emits results at the end of its input, Flink will deploy operatorB after operatorA is finished. This approach reduces unnecessary resource usage when operatorA is still processing its inputs.
2) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows to create a GlobalWindow that is only triggered at the end of its inputs, instead of writing tens of lines of code to define a WindowAssigner subclass.
3) Introduce attributes to the operator to let Flink know if the operator only outputs results when the input has ended. All the operators defined in Flink should set the attributes accordingly to achieve the benefit above. User-defined operators should be able to set the attributes accordingly so that they can achieve the benefit.
Public Interfaces
1) Add the createWithEndOfStreamTrigger
method in GlobalWindows which allows users of the DataStream API to create GlobalWindows with a window trigger that only fires when the input ends1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.
Code Block | ||
---|---|---|
| ||
/** * This WindowAssigner assigns all elements to the same window that is fired iff the input * stream reaches EOF. */ @PublicEvolving public class EndOfStreamWindowsGlobalWindows extends WindowAssigner<Object, TimeWindow>GlobalWindow> { ··· private static final TimeWindow TIME_WINDOW_INSTANCE = new TimeWindow(Long.MIN_VALUE, Long.MAX_VALUE); private EndOfStreamWindows() {}/** public* static EndOfStreamWindows get()Creates a { @link WindowAssigner} that assigns all elements to the return INSTANCE; } same {@link GlobalWindow}. @Override * The window is publiconly Collection<TimeWindow>useful assignWindows( if you also specify a custom trigger. Otherwise, the window will Object element, long timestamp,* WindowAssignerContextnever context)be { triggered and no computation will be return Collections.singletonList(TIME_WINDOW_INSTANCE);performed. } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env*/ public static GlobalWindows create() { return new EndOfStreamTrigger();... } @Override public boolean isEventTime() {/** * Creates a return true; } private static class EndOfStreamTrigger extends Trigger<Object, TimeWindow> { @Override{@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow} * public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE;and the window is triggered if and only if the input stream is ended. } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx*/ public static GlobalWindows createWithEndOfStreamTrigger() { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } ... } } |
2) Add OperatorAttributesBuilder and OperatorAttributes for operator developers to specify operator attributes that Flink runtime can use to optimize the job performance.
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** The builder class for {@link OperatorAttributes}. */ @Experimental public class OperatorAttributesBuilder { @Nullable private Boolean outputOnEOF = null; public OperatorAttributesBuilder() {...} /** public OperatorAttributesBuilder setOutputOnEOF(boolean outputOnEOF) {...} /** * Set to true if and only if the operator only emits records after all its inputs have ended. * If any operator attribute is null, we will log it at DEBUG level and use the following it is not set, the default value false is used. */ public OperatorAttributesBuilder setOutputOnlyAfterEndOfStream( *boolean default values. * - outputOnEOF defaults to false */ outputOnlyAfterEndOfStream) {...} /** If any operator attribute is not set explicitly, we will log it at DEBUG level. */ public OperatorAttributes build() {...} } |
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize the job performance. */ @Experimental public class OperatorAttributes { /** * Returns true iff if and only if the operator can only emitemits records after all its inputs have reached EOFended. * * <p>Here are the implications when it is true: * * <ul> * <li> The results of this operator as well as its upstreamchained operators have blocking partition type. * <li> This operator as well as its upstreamchained operators will be executed in batch mode. * </ul> */ public boolean isOutputOnEOFisOutputOnlyAfterEndOfStream() {...} } |
3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
Code Block | ||
---|---|---|
| ||
@Experimental public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... /** * Get the {@link OperatorAttributes} of the operator that the Flink runtime can use to optimize * the job performance. * * @return OperatorAttributes of the operator. */ @Experimental default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } @Experimental public interface StreamOperatorFactory<OUT> extends Serializable { ... /** * Get the {@link OperatorAttributes} of the operator created by the factory. Flink runtime can * use it to optimize the job performance. * * @return OperatorAttributes of the operator created by the factory. */ @Experimental default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } |
Proposed Changes
1) Add the APIs on Transformation PhysicalTransformation
interface to get the corresponding operator attributes.
Code Block | ||
---|---|---|
| ||
@Internal public abstract class PhysicalTransformation<T> extends Transformation<T> { public boolean isOutputOnEOFisOutputOnlyAfterEndOfStream() { return false; } } |
2) Update the following Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.:
- OneInputTransformation
- TwoInputTransformation
- AbstractMultipleInputTransformation
3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.
If a Transformation has isOutputOnEOF isOutputOnlyAfterEndOfStream == true:
- The process of operator chain chaining can still be done. After that, the results of its operator chain as well as and its upstream operators will be set blocking (by default) or hExecutionOptions.BATCH_SHUFFLE_MODE.
- This operator as well as its upstream operators will be executed in batch mode (e.g .
More specifically, the checkpoint is disabled when these operators are running).
4) A blocking input edge with pending records is same as a source with isBacklog=true when an operator determines its RecordAttributes for downstream nodes.
5) When DataStream#coGroup is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.
In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true
This operator will use the following optimization to achieve much higher throughput than the existing DataStream#coGroup in both stream and batch mode:
- It will instantiate two internal sorter to sorts records from its two inputs separately. Then it can pull the sorted records from these two sorters. This can be done without wrapping input records with TaggedUnion<...>. In comparison, the existing DataStream#coGroup needs to wrap input records with TaggedUnion<...> before sorting them using one external sorter, which introduces higher overhead.
- It will not invoke WindowAssigner#assignWindows or triggerContext#onElement for input records. In comparison, the existing WindowOperator#processElement invokes these methods for every records.
6) When DataStream#aggregate is invoked with EndOfStreamWindows as the window assigner, Flink should generate an operator with isOutputOnEOF = true.
In addition, after FLIP-327 is accepted, this operator should also have isInternalSorterSupported = true. This operator will sort the input before aggregate, and avoid invoking window actions, which is similar to '5)'.
Benchmark results
EndOfStreamWindows
as the window assigner and trigger and evictor are not assigned, the optimized operator EOFCoGroupOperator
/ EOFAggregationOperator
will be applied, in which case the input(s) will be sorted first and then output the results after the input end of input. This makes the performance with our work can be mutiple times faster than before and slightly faster than batch mode because of avoiding window related work.Using hybrid shuffle in the outputEOF part can make part of operator pipeline consume records from upstream to further improve the performance.
To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations 5 times. Here are benchmark results which include
Environment Configuration
Code Block | ||
---|---|---|
| ||
jobmanager.memory.process.size: 6400m
taskmanager.memory.process.size: 6912m |
Data Count
number of records distributed in Data Count
Code Block | ||
---|---|---|
| ||
data1.coGroup(data2)
.where(tuple -> tuple.f0)
.equalTo(tuple -> tuple.f0)
.window(EndOfStreamWindows.get())
.apply(new CustomCoGroupFunction())
.addSink(...); |
5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle | |
---|---|---|---|---|
8e7 | 66 ± 1 (100%, 1202426 ms) | 491 ± 5 (743%, 162731 ms) | 1506 ± 10 (2281%, 53098 ms) | 1677 ± 42 (2540%, 47687 ms) |
This shows with the changes proposed above, DataStream#coGroup in Optimized STREAMING can be 20X faster than STREAMING throughput and 3X faster than BATCH
Benchmark: Aggregate
Data Count
Code Block | ||
---|---|---|
| ||
data
.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new Aggregator())
.addSink(new CountingAndDiscardingSink()); |
5 times in 4 kinds of configuration: streaming mode, batch mode, optimized streaming mode after this PR and optimized streaming mode with hybrid shuffle after this PR. Here are the throughput benchmark results:
STREAMING | BATCH | Optimized STREAMING | With hybrid shuffle | |
---|---|---|---|---|
8e7 | 163 ± 0 (100%, 490478 ms) | 1561 ± 16 (957%, 51237 ms) | 1733 ± 9 (1063%, 46143 ms) | 1992 ± 15 (1222%, 40148 ms) |
This shows with the changes proposed above, DataStream#aggregate in Optimized STREAMING can be 10X faster than STREAMING throughput and slightly faster to the BATCH
Benchmark: OperatorDelayedDeploy
We have a job that pre-processes records from a bounded source (Source1) using an operator (Process1) which only emits results after its input has ended. Then anther operator(Process2) needs to process records emitted by Process1 with records from an unbounded source, and emit results with low processing latency in real-time.
Code Block | ||
---|---|---|
| ||
source1.keyBy(value -> value.f0)
.window(EndOfStreamWindows.get())
.aggregate(new MyAggregator()).name("Process1")
.connect(source2.keyBy(value -> value.f0))
.transform("Process2", Types.INT, new MyProcessOperator())
.addSink(new DiscardingSink<>()); |
Resource Utilization
Suppose we set an extra standalone cluster configuration:
Code Block | ||
---|---|---|
| ||
taskmanager.numberOfTaskSlots: 2 |
Source1
,Process1
, Source2
and Process2
in 4 different SlotSharingGroups as above and then run this job, we could see that, before this PR, all of the tasks couldn't deploy and blocked in CREATED
state because only 2 slots provided in TaskManager. With our work, Source2
and Process2
can be deplyed after Source1
,Process1
finished and released their slots.
Without setting the slot sharing group, we run job in STREAMING, Optimized STREAMING and Optimized STREAMING With hybrid shuffle. As the job records number grows, throughput in STREAMING gets decreased because of the buffer presure in Process2
, and even causes OOM exception while it works well in Optimized STREAMING. Using hybrid shuffle can make Process1
to pipeline consume records from Source1
Compatibility, Deprecation, and Migration Plan
- , such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover.
4) Enumerate all existing operators defined in Flink and set the isOutputOnlyAfterEndOfStream accordingly for those operators that may only output at the end of the input to enable the optimization mentioned in 3):
- WindowOperator: When used with GlobalWindows that is created with the method
createWithEndOfStreamTrigger
as the window assigner - StreamSortOperator
5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows, the WindowOperator will check if the window assigner is an instance of GlobalWindow and only triggers at the end of the inputs. If true, the operator will have OperatorAttributes with isOutputOnlyAfterEndOfStream = true to achieve higher throughput by using the optimizations in batch mode.
This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.
More specifically, the following DataStream API can benefit from using GlobalWindows which is created with the method createWithEndOfStreamTrigger.
- DataStream#windowAll
- KeyedStream#window
- CoGroupedStreams#Where#EqualTo#window
- JoinedStreams#Where#EqualTo#window
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible.
The changes made in this FLIP are backward compatible.
Future Work
- It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached
...
- the end for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.
- Setting the upstream operators of the full-dam operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
- Optimizing the implementation of frequently used full-dam operators, such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. For example, we can instantiate an internal sorter in the operator, and it will not have to invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record.