Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The user specified total amount of memory Total of a Kafka Streams instance is always divided evenly to its threads upon starting up the instance, whose number is static throughout its life time.

  • Within a single stream thread, the total memory Total / numThreads will first be subtracted by the reserved memory for its producer (denoted as Producer) and consumer (denoted as Consumer) client usage, whose values are also static throughout the thread's life time.
     
  • For the rest of usable memory Total / numThreads - Producer - Consumer, it is dynamically allocated upon each rebalance:
    • Every time upon a rebalance, when the assigned tasks are created, the thread will first extract the memory by the amount of buffering needs (Buffer), calculated as buffered.bytes.per.partition * total.number.partitions.
    • Then it will extract the amount of memory used for all its persistent state stores (State), calculated by different store's specific equations, for example for RocksDB it is calculated as block_cache_size + write_buffer_size * max_write_buffer_number.
    • If the rest amount of memory Total / numThreads - Producer - Consumer - Buffer - State is already negative, then we should log WARNING that there may not be not enough memory for this instance.
    • Otherwise, the rest amount of memory is allocated for caching needs (Cache), and multiple caches will try to dynamically allocate memory from this buffer pool, and possibly flushing if it is about to be exhausted, as we mentioned above.

...

NOTE that the caveat of this approach is that the amount of Buffer and State can increase with the number of tasks assigned to this instance's threads, and hence we may not actively guarding against the cascading OOMs as we mentioned above. As Jay Kreps suggested in an off-line discussion, one way to think of this issue is the following:

  • Among all the memory usage listed above

...

  • , Producer and Consumer are "per-thread", and since #.threads are static throughout the Kafka Streams life time, their total memory usage is also static and hence is easily bounded.
     
  • State, Buffer and Cache are "per-task", which can change dynamically from rebalance to rebalance as #.tasks assigned to the threads of the Kafka Streams instance can change over time.
    • Therefore calculating their usage in a "bottom-up" manner by letting users specify its upper bound per-task / per-store / per-partition will not be able to bound the total memory they use, and hence upon task migration cascading OOMs could happen.
    • Instead, we should bound their usage in a "top-down" manner, i.e. we should calculate the per-task / per-store / per-partition configs based on the total allocable memory (i.e. Total / numThreads - Producer - Consumer) and the #.tasks / etc upon rebalancing dynamically, for example buffered.bytes.per.partition

...

    •  and RocksDB'

...

    • block_cache_size.
       
  • Regarding Buffer specifically, currently it is based on a per-partition config (buffered.records.per.partition), but since its usage is only for reasoning about the stream time, and its deserializing raw bytes are bounded by Consumer already, we should consider configuring it also at the global level, for example replacing buffered.records.per.partition with buffered.records.bytes.

 

Open Questions

  1. Should we buffer records or bytes for Buffer? The pros of buffering records is avoid deserialization overheads, but the cons are expensive record size estimates. 
  2. Should we buffer records or bytes for Cache? Similar trade-offs as above, but in addition that given the scenarios if there is already a state store underneath with its own block caching in bytes, should we consider removing this cache at all and only relying on the state store (RocksDB)'s own block caching, while only keeping the dirty map in bytes and pay the get() values for all dirty keys and deserialization upon flushing?

 

The above questions can probably be better answered by inducting some benchmark experiments comparing, for example https://github.com/jbellis/jamm for estimating record sizes with serialization / deserialization costs. and just treat the specified value as a upper bound.