Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between Ran Jinhao and Dong Lin ]


Table of Contents

Motivation

As shown in the figure below, we might have a job that pre-processes records from a bounded source (i.e. inputA) using an operator (i.e. operatorA) which only emits results after its input has ended. Then operatorB needs to join records emitted by operatorA with records from an unbounded source, and emit results with low processing latency in real-time.

...

In addition, this FLIP also adds EndOfStreamWindows that can be used with the DataStream API to specify whether the window will end only at the end of its inputs. DataStream program can take advantage of this information optimize resource utilization as well as job throughput.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

...

Code Block
languagejava
@Experimental
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}
 
@Experimental
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...
 
    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

...

More specifically, This operator will sort the input before aggregation, and avoid invoking window actions, which is similar to '5)'.

Benchmark results

To demonstrate our optimization improvements,we run each benchmark in different execution modes and configurations. Each result is measured by taking the average execution time across 5 runs with the given configuration.

...

Additionally, we can use this program to demonstrate that it can achieve higher performance because Process2 will not need to keep buffer records emitted by Source2 in its memory while Process1 has not reached EOF. More specifically, the program can fail with OOM before this FLIP when the number of records in inputs is high. And the program can finish successfully without OOM after this FLIP.

Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

Future Work

It would be useful to add an ExecutionState (e.g. Finishing) to specify whether the task has reached EOF for all its inputs. This allows JM to deploy its downstream tasks and possibly apply hybrid shuffle to increase job throughput.

...