Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In this FLIP, we propose to optimize performance for the above use-case by 1) allowing operator to declare its attributes (e.g. isOutputOnEOF) so that JM will deploy its downstream operators only after such an operator finishes; and 2) support stream-batch unified operator which can switch from batch mode to stream mode during execution without restart. We hope these capability can further enhance Flink as a stream-batch unified processing engine.

NOTE: this FLIP depends on the APIs (e.g. RecordAttributes with isBacklog information) proposed in FLIP-325.



Terminology and Background

...

  • Add a subclass of TwoInputStreamOperator that uses an internal sorter to do co-group with pretty much the same implementation as to DataSet#coGroup.
  • This operator should override getOperatorAttributes() to return OperatorAttributes(IsOutputOnEOF=true, HasInternalSorter=true).
  • Update CoGroupedStreams#WithWindow#apply to use this operator to do co-group when the user-specified window assigner is EndOfStreamWindows.

6) For those stream-only operators whose throughput is observably slower than the corresponding batch-only operator, update stream-only operator to be stream-batch unified operator.

Typically, those operators that have keyed inputs and involve aggregation operator (e.g. join, cogroup, aggregate) can benefit from this optimization.

Though the concrete changes may be vary across operators, typically an operator might need the following changes to be stream-batch unified:

  • Override getOperatorAttributes to return HasInternalSorter = true if the operator needs to use internal sorter in the batch mode.
  • Override the corresponding processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status. For example, the operator can buffer and sort input records when any input has isBacklog=true. And once all inputs' status has switch to isBacklog=false, then process the buffered records, emit results, and start to work in the stream execution mode.


Example Usages

1) User wants to co-group records from two bounded streams and emit output after both inputs have ended.

...