Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Whenever all the tasks belonging to a particular Stream Thread have amassed bytes more than what has been allocated to it, the underneath consumer would pause consuming from all partitions that have some data already. What this also means, is that partitions with empty buffers wouldn't be paused.
  • The paused partitions would be resumed after the buffered size for all tasks in the thread are lesser than the bytes allocated.
  • Whenever there is any modification in the number of Stream Threads in the topology, the buffer bytes allocation to all Stream Threads would change. This behaviour is exactly similar to cache.max.bytes.buffering.
  • As per the suggestions on the dev mailing list, we would also be renaming the config cache.max.bytes.buffering to statestore.cache.max.bytes as part of this KIP.
  • Adding a new metric at a partition-group class level to measure all bytes aggregated per task(input-buffer-bytes-total). This would represent bytes aggregated by all partitions of the input task.
  • Adding a new metric at a partition-group class level to measure cache size accumulated per task(cache-size-bytes-total). This would represent bytes aggregated by all partitions of the input task.

...

  • Adding a new config input.buffer.max.bytes applicable at a topology level. 
    • Importance: Medium
    • Default Value: 512 MB.
  • Adding new config statestore.cache.max.bytes. Importance and default value would be the same as  cache.max.bytes.buffering.
  • Adding new metric called input-buffer-bytes-total .
    • type = stream-task-metrics
    • thread-id = [thread ID]
    • task-id = [task ID]
    • The recording level for all metrics will be DEBUGINFO

    • Description: The total number of bytes accumulated by this task
  • Adding new metric called cache-size-bytes-total .
    • type = stream-task-metrics
    • thread-id = [thread ID]
    • task-id = [task ID]
    • The recording level for all metrics will be INFO

    • Description: The cache size in bytes accumulated by this task.

Compatibility, Deprecation, and Migration Plan

...