Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

6) For those stream-only operators whose throughput is observably slower than the corresponding batch-only operator, update stream-only operator to be add the corresponding stream-batch unified operator.

Typically, those operators operations that have keyed inputs and involve aggregation operator (e.g. join, cogroup, aggregate) can benefit from this optimization.

Though the concrete changes implementation may be vary across operators, typically an operator might need needs the following changes to be stream-batch unified:

  • Override getOperatorAttributes to return HasInternalSorter = true if the operator needs to use internal sorter in the batch mode.
  • Override the corresponding processRecordAttributes() API (see FLIP-325) to adjust its behavior based on the input's isBacklog status. For example, the operator can buffer and sort input records when any input has isBacklog=true. And once all inputs' status has switch to isBacklog=false, then process the buffered records, emit results, and start to work in the stream execution mode.

Note: we still need to keep the stream-only operator. JM can pick stream-batch unified operator only when execution.checkpointing.interval-during-backlog == 0, which means checkpoint will be disabled when the operator has any input with backlog.


Example Usages

1) User wants to co-group records from two bounded streams and emit output after both inputs have ended.

...