...
Discussion thread | https://lists.apache.org/thread/qq39rmg3f23ysx5m094s4c4cq0m4tdj5 | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | -https://lists.apache.org/thread/oy9mdmh6gk8pc0wjdk5kg8dz3jllz9ow | ||||||||
JIRA |
| ||||||||
Release | - |
...
[This FLIP proposal is a joint work of Ran Jinhao, Dong Lin and Xuannan Su ]
Table of Contents |
---|
As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended, which is also known as . We will call such operator full-dam operator. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.
...
In addition, this FLIP also adds EndOfStreamTrigger in GlobalWindows. GlobalWindows with EndOfStreamTrigger can be used with the DataStream API to specify whether allows creating a GlobalWindows that the window will end be triggered only at the end of its inputs. DataStream programs (e.g. coGroup and aggregate) can take advantage of this information to significantly increase throughput and reduce resource utilization.
Objectives
The APIs provided in this FLIP achieve the following objectives / benefits:
...
2) Improve usability. For use-case that needs to invoke DataStream APIs (e.g. KeyedStream#window) with a window assigner that covers all input data, users can use the off-the-shelf GlobalWindows with EndOfStreamTrigger provided in this FLIPto create a GlobalWindow that is only triggered at the end of its inputs, instead of writing tens of lines of code to define this a WindowAssigner subclass.
...
3) Introduce attributes to the operator to let Flink know if the operator only outputs results when the input has ended. All the operators defined in Flink should set the attributes accordingly to achieve the benefit above. User-defined operators should be able to set the attributes accordingly so that they can achieve the benefit.
Public Interfaces
1) Add the createWithEndOfStreamTrigger
method in GlobalWindows which allows users of the DataStream API to create GlobalWindows with a window trigger that only fires when the input ends.
Code Block | ||
---|---|---|
| ||
@PublicEvolving public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { ··· /** * Creates a new {@link WindowAssigner} with {@link NeverTrigger} that assigns all elements to * the same {@link GlobalWindow}. * The window is only useful if you also specify a custom * trigger. Otherwise, the window will * never be triggered and no computation will be performed. * * @return The global window policy with {@link NeverTrigger} as default trigger. */ public static GlobalWindows create() { ... } /** * Creates a new {@link WindowAssigner} with {@link EndOfStreamTrigger} that assigns all * elements to the same {@link GlobalWindow} * and the defaultwindow triggeris firestriggered if and only if the * input stream is ended. * * @return The global window policy with {@link EndOfStreamTrigger} as default trigger. */ public static GlobalWindows createWithEndOfStreamTrigger() { ... } } |
...
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** The builder class for {@link OperatorAttributes}. */ @Experimental public class OperatorAttributesBuilder { @Nullable private Boolean outputOnEOF = null; public OperatorAttributesBuilder() {...} /** * Set whether to true if and only if the operator only emits records resultsafter whenall its inputinputs hashave ended. */ public* OperatorAttributesBuilderIf setOutputOnEOF(boolean outputOnEOF) {...} /**it is not set, the default value false is used. */ If any operator attribute is null, wepublic willOperatorAttributesBuilder logsetOutputOnlyAfterEndOfStream( it at DEBUG level and use the following boolean * default values. * - outputOnEOF defaults to false */ outputOnlyAfterEndOfStream) {...} /** If any operator attribute is not set explicitly, we will log it at DEBUG level. */ public OperatorAttributes build() {...} } |
Code Block | ||
---|---|---|
| ||
package org.apache.flink.streaming.api.operators; /** * OperatorAttributes element provides Job Manager with information that can be * used to optimize the job performance. */ @Experimental public class OperatorAttributes { /** * Returns true iff if and only if the operator only emits records after all its inputs have reached EOFended. * * <p>Here are the implications when it is true: * * <ul> * <li> The results of this operator as well as its chained operators have blocking partition type. * <li> This operator as well as its chained operators will be executed in batch mode. * </ul> */ public boolean isOutputOnEOFisOutputOnlyAfterEndOfStream() {...} } |
3) Add the getOperatorAttributes() API to the StreamOperator and StreamOperatorFactory interfaces.
Code Block | ||
---|---|---|
| ||
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable { ... /** * Get the {@link OperatorAttributes} of the operator that the Flink runtime can use to optimize * the job performance. * * @return OperatorAttributes of the operator. */ @Experimental default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } public interface StreamOperatorFactory<OUT> extends Serializable { ... /** * Get the {@link OperatorAttributes} of the operator created by the factory. Flink runtime can * use it to optimize the job performance. * * @return OperatorAttributes of the operator created by the factory. */ @Experimental default OperatorAttributes getOperatorAttributes() { return new OperatorAttributesBuilder().build(); } } |
Proposed Changes
1) Add the APIs on Transformation
PhysicalTransformation
interface to get the corresponding operator attributes.
Code Block | ||
---|---|---|
| ||
@Internal public abstract class PhysicalTransformation<T> extends Transformation<T> { public boolean isOutputOnEOFisOutputOnlyAfterEndOfStream() { return false; } } |
2) Update the following Transformation subclasses (e.g. OneInputTransformation and TwoInputTransformation) to override the newly added methods using the OperatorAttributes obtained from the underlying Operator.:
- OneInputTransformation
- TwoInputTransformation
- AbstractMultipleInputTransformation
3) Update JM to make use of the following operator attributes when compiling the Transformation graph into the JobGraph.
If a Transformation has isOutputOnEOF isOutputOnlyAfterEndOfStream == true:
- The process of operator chaining can still be done. After that, the results of its operator chain will be set blocking.
- This operator will be executed in batch mode.
More specifically, the checkpoint is disabled when these operators are running, such that these operators can do operations not compatible with checkpoints (e.g. sorting inputs). And operators should re-read the data from the upstream blocking edge or sources after failover.
...
4) Enumerate all existing operators defined in Flink and set the isOutputOnEOF isOutputOnlyAfterEndOfStream accordingly for those operators that may only output at the end of the input to enable the optimization mentioned in 3).:
- WindowOperator: When used with GlobalWindows that is created with the method
createWithEndOfStreamTrigger
as the window assigner - StreamSortOperator
5) When DataStream API like DataStream#CoGroup is invoked with GlobalWindows and EndOfStreamTrigger as , the WindowOperator will check if the window assigner and trigger, The is an instance of GlobalWindow and only triggers at the end of the inputs. If true, the operator will have OperatorAttributes with isOutputOnEOF isOutputOnlyAfterEndOfStream = true to achieve higher throughput by using the optimizations in batch mode.
This FLIP can potentially benefit all DataStream APIs that take WindowAssigner as the parameter.
More specifically, the following DataStream API can benefit from using GlobalWindows with EndOfStreamTriggerwhich is created with the method createWithEndOfStreamTrigger.
- DataStream#windowAll
- KeyedStream#window
- CoGroupedStreams#Where#EqualTo#window
- JoinedStreams#Where#EqualTo#window
Compatibility, Deprecation, and Migration Plan
The changes made in this FLIP are backward compatible.
...
Future Work
- It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF the end for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.
- Setting the upstream operators of the full-dam operator in batch mode will also increase the job throughput, and Hybrid shuffle mode can also be used in batch mode part to further improve the performance when there are sufficient slot resources.
- Optimizing the implementation of frequently used full-dam operators, such as aggregate/CoGroup/Join/..., can achieve higher throughput by using the optimizations currently done in batch mode. For example, we can instantiate an internal sorter in the operator, and it will not have to invoke WindowAssigner#assignWindows or triggerContext#onElement for each input record.
...