Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In the ideal world, Kafka Streams should provide very simple configuration for its memory management. More concretely, for example, users should be able to just specifying a couple of configs specifying the memory limit on the Producer, Consumer and Streams client as sth. like:

...

single config value that bounds the total usage of 1) + 2) + 3) + 4) + 5) above

...

, for example:

streams.memory.bytes (denoted as Total)
 

 

With P and C having some reasonable default values, most of the time users only need to specify S above while keeping in mind that it needs to be at least be larger than

producer.memory.bytes + consumer.memory.bytes
 


which represent case 1) and 2) above, and can also be specified by the user through the StreamsConfig, but in practice they may just be using the default values.

 

P + C, and the library is then guaranteed to use less memory than S. And hence if users start their Streams application in a container with bounded memory usage as X, they know that their coded application can use up to the amount of memory allowed by the container minus total allocable Streams library usage, i.e. X - S. And even under task migration scenarios upon failures, or rebalancing, the immigrated tasks which will then allocate memory for its own caching and state stores, etc, will not suddenly increase the libraries memory usage since its total is still bounded, and hence not causing OOM (note that in case of task migration due to one instance failure, without memory bounding it may cause cascading OOMs, which is really bad user experience).

...

With this end goal in mind, now let's see how we should bound the memory usage for the above cases, especially 3), 4) and 5). 

 

Triggering based Caches (T)

The total memory used for this part can (denoted as Cache) can be calculated as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all caches created within this task's topology} (#.bytes in this cache)))

...

We know that for deserialized objects, their size in bytes are hard to estimate accurately without serializing them to bytes. Hence we should consider just caching these values in terms of bytes byte arrays and always pay the serialization / deserialization costs for better memory management.

And although we should divide the memory into individual threads, we could consider instead of dividing the total allocable amount of memory within a thread to individual caches from different tasks, we should consider globally allocate memory for all caches, and flush those caches within the current processing task that used most bytes when the memory is exhausted. in the future we will allow users to configure which state stores they are going to use for their stateful operations: it can be persistent or in-memory, and then:

  • We only need in-memory caching if persistent stores are used for aggregates, which will introduce extra serde costs as mentioned above.
  • If the state stores used are already in-memory (and this should be in deserialized objects), we do not need the caching in bytes any more, while we still keep the dirty map for triggered flushing.

 

Deserialized Objects Buffering (B)

The total memory used for this part (denoted as Buffer) can be calculated as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{partitions assigned to the task} (#.bytes buffered for that partition)))

...

Assuming that in the future most users will define record timestamps to be the timestamp on the message metadata field, and we use a similar high-watermark approach for determining stream time boundary, and for the rare case where user's specify a different timestamp extractor we are willing two pay the deserialization cost twice just for getting the source timestamp, then we can keep this buffer in raw bytes as well: i.e. if the default record timestamp extractor is used, we just get the raw bytes records from consumer.poll and extract their timestamps; if other timestamp , THEN we do not need to de-serialize the records any more, and hence can completely rely on consumer's own buffering of raw bytes. Hence this deserialized objects buffering can be removed completely. extractor is used, we deserialize the record to get the timestamp, and throw away the deserialized records but still keep the raw bytes in its buffer. In this case, we can change the config to:

buffered.bytes.per.partition

Persistent State Store Buffering (

...

State)

The total memory used for this part (denoted as Store) as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all RocksDB stores in the task} (total #.bytes allocated for this RocksDB)))

 

Future Plan Proposal

We can still let For advanced users to specify different configs for 1who have good understandings about RocksDB configs, they should still be able to specify these config values such as 1) block cache size, 2) Memtable buffer size, 3) number of Memtables, 4) etc for a single KafkaStreams instance, and be aware that the amount of memory for caching will be the total specified amount of memory minus the total amount of memory used by these state stores (thus, they need to be aware of the equation to calculate the TOTAL amount of memory used based on these config values); and when users do not specify these configs we should dynamically set some default values based on the total specified amount of memory to make sure we still have some amount left for caching.

 

; and if no user-specified values are provided some default values will be provided. BUT for some of these configs like block cache size, it should be a per Kafka Streams instance config instead of a per RocksDB config, and hence the Streams library should divide its values among the threads / tasks / RocksDB instances dynamically.

And also And as a side note, if we are using bytes in our own caching layer as proposed above, then we should try to reduce the usage of RocksDB's own Memtable by default as it effectively have less benefits additionally.

Summary

So putting it all together, here is the proposal of Kafka Streams to reason about its memory usage:

  • The user specified total amount of memory Total of a Kafka Streams instance is always divided evenly to its threads upon starting up the instance, whose number is static throughout its life time.

  • Within a single stream thread, the total memory Total / numThreads will first be subtracted by the reserved memory for its producer (denoted as Producer) and consumer (denoted as Consumer) client usage, whose values are also static throughout the thread's life time.
     
  • For the rest of usable memory Total / numThreads - Producer - Consumer, it is dynamically allocated upon each rebalance:
    • Every time upon a rebalance, when the assigned tasks are created, the thread will first extract the memory by the amount of buffering needs (Buffer), calculated as buffered.bytes.per.partition * total.number.partitions.
    • Then it will extract the amount of memory used for all its persistent state stores (State), calculated by different store's specific equations, for example for RocksDB it is calculated as block_cache_size + write_buffer_size * max_write_buffer_number.
    • If the rest amount of memory Total / numThreads - Producer - Consumer - Buffer - State is already negative, then we should log WARNING that there may not be not enough memory for this instance.
    • Otherwise, the rest amount of memory is allocated for caching needs, and multiple caches will try to dynamically allocate memory from this buffer pool, and possibly flushing if it is about to be exhausted, as we mentioned above.

 

NOTE that the caveat of this approach is that the amount of Buffer and State can increase with the number of tasks assigned to this instance's threads, and hence we may not actively guarding against the cascading OOMs as we mentioned above. A further optimization is that we can pro-actively shrink the config values controlling these amount, for example buffered.bytes.per.partition and RocksDB's block_cache_size and just treat the specified value as a upper bound.