Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Let's consider a use-case where user wants to bootstrap job state (operatorA and operatorB in the figure below) using records from a bounded source. There is no need to emit any records when the job is still processing records from the bounded source. After this bootstrap phase is done (i.e. all records from the bounded source have been processed by operatorA and operatorB), user wants to process records from the unbounded source and emit results continuously in real-time. Additionally, we assume operatorA involves aggregation computation (e.g. co-group) and its throughput in batch mode can be 10X faster than in stream mode (see Benchmark Results below).

...

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will deploy its downstream operators only after such an operator finishes; and 2) support stream-batch unified operator which can switch from batch mode to stream mode during execution without restart. We hope these capability can further enhance Flink as a stream-batch unified processing engine.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325.



Terminology and Background

Currently, operators supported by Flink must be either stream-only operator or batch-only operator, with the definition specified below.

...

By supporting stream-batch unified operator, Flink can deliver optimal performance for the use-case described in the motivation section.


Public Interfaces

1) Add EndOfStreamWindows which is a subclass of WindowAssigner. This class allows users of the DataStream API to specify whether the computation (e.g. co-group, aggregate) should emit data only after end-of-input.

...

Code Block
languagejava
@PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}

@PublicEvolving
public interface StreamOperatorFactory<OUT> extends Serializable {
    ...

    default OperatorAttributes getOperatorAttributes() {
        return new OperatorAttributesBuilder().build();
    }
}


Proposed Changes

1) Add the APIs on Transformation interface to get the corresponding operator attributes.

...

  • Override getOperatorAttributes to return HasInternalSorter = true if the operator needs to use internal sorter in the batch mode.
  • Override the corresponding processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status. For example, the operator can buffer and sort input records when any input has isBacklog=true. And once all inputs' status has switch to isBacklog=false, then process the buffered records, emit results, and start to work in the stream execution mode.


Example Usages

1) User wants to co-group records from two bounded streams and emit output after both inputs have ended.

...

This shows with the changes proposed above, DataStream#coGroup can be 20X faster than its current throughput in stream mode, and 2.5X faster than its current throughput in batch mode.


Compatibility, Deprecation, and Migration Plan

The changes made in this FLIP are backward compatible. No deprecation or migration plan is needed.

...