Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With P and C having some reasonable default values, most of the time users only need to specify S above while keeping in mind that it needs to be at least P + C, and the library is then guaranteed to use less memory than S. And hence if users start their Streams application in a container with bounded memory usage as X, they know that their coded application can use up to the amount of memory allowed by the container minus allocable library usage, i.e. X - S. And even under task migration scenarios upon failures, or rebalancing, the immigrated tasks which will then allocate memory for its own caching and state stores, etc, will not suddenly increase the libraries memory usage since its total is still bounded, and hence not causing OOM (note that in case of task migration due to one instance failure, without memory bounding it may cause cascading OOMs, which is really bad user experience).

 

With this end goal in mind, now let's see how we should bound the memory usage for the above cases, especially 3), 4) and 5). 

 

Triggering based Caches

The total memory used for this part can be calculated as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all caches created within this task's topology} (#.bytes in this cache)))

 

NOTE that:

  • This total is dynamic from rebalance to rebalance since the tasks assigned to each thread, and hence its corresponding sub-topology's number of caches can change.
  • That with the triggering based caches on top of all RocksDB instance for aggregations, we are effectively double caching the records (one cache on top of RocksDB, and one cache inside RocksDB as Memtables). We have this extra caching in objects originally only for reducing serialization costs.

Future Plan Proposal

This should be considered as part of KIP-63.

We know that for deserialized objects, their size in bytes are hard to estimate accurately without serializing them to bytes. Hence we should consider just caching these values in terms of bytes and always pay the serialization costs.

And although we should divide the memory into individual threads, we could consider instead of dividing the total allocable amount of memory within a thread to individual caches from different tasks, we should consider globally allocate memory for all caches, and flush those caches within the current processing task that used most bytes when the memory is exhausted. 

 

Deserialized Objects Buffering

The total memory used for this part can be calculated as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{partitions assigned to the task} (#.bytes buffered for that partition)))

 

Today we have a config 

buffered.records.per.partition

that controls how many records we would buffer before pausing the fetching on that partition, but that 1) does not restrictedly enforce the upper limit on the number of records, and 2) number of deserialized records does not imply #. bytes.

Future Plan Proposal

Assuming that in the future most users define record timestamps to be the timestamp on the message metadata field, and we use a similar high-watermark approach for determining stream time boundary, and for the rare case where user's specify a different timestamp extractor we are willing two pay the deserialization cost twice just for getting the source timestamp, THEN we do not need to de-serialize the records any more, and hence can completely rely on consumer's own buffering of raw bytes. Hence this deserialized objects buffering can be removed completely.

 

Persistent State Store Buffering (RocksDB)

The total memory used for this part as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all RocksDB stores in the task} (total #.bytes allocated for this RocksDB)))

 

Future Plan Proposal

We can still let advanced users to specify different configs for 1) block cache size, 2) Memtable buffer size, 3) number of Memtables, 4) etc for a single KafkaStreams instance, and be aware that the amount of memory for caching will be the total specified amount of memory minus the total amount of memory used by these state stores (thus, they need to be aware of the equation to calculate the TOTAL amount of memory used based on these config values); and when users do not specify these configs we should dynamically set some default values based on the total specified amount of memory to make sure we still have some amount left for caching.