Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new processing rate is calculated by looking at the current processing rate and the backlog size, as well as the estimated downtime during rescale. Further, users can configure a maximum time to fully process the backlog.
Current backlog size information should come from the connector metrics (when available) and the estimated time to rescale could either be provided by a user (in this we need to create a config option for this) or the operator could keep track of past scaling actions and measure their time to determine this automatically. However in the first iteration it should be enough to scale based on the current backlog at the time of scaling, and the spare capacity allocated should allow for catching up the lag growth during the scling itself.

 

For most sources like Kafka there is an upper bound on the parallelism based on the number of partitions. If backlog information is not available, users can set desired parallelism or target rate. We also aim at choosing a parallelism which yields an equal distribution of the splits among the subtasks.

Sidenote about scaling sources without backlog information: Backlog growth information is necessary to scale sources succesfully under backpressure. However there is nothing stopping us from scaling the sources on a busyTime/CPU time basis once backpressure has been eliminated by scaling up downstream operators. This was not part of our initial prototype implementation but it could be extended easily, as it does not affect the algorithm at large.
Most regular streaming sources should in any case provide backlog / input rate information even if they are not currently exposed under pendingRecords. It should be our goal to improve connector metrics, not only to provide the best autoscaling capability but good operational experience in general.

...