Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With this end goal in mind, now let's see how we should bound the memory usage for the above cases, especially 3), 4) and 5). 

 

Triggering based Caches

The total memory used for this part (denoted as Cache) can be calculated as:

...

  • This total is dynamic from rebalance to rebalance since the tasks assigned to each thread, and hence its corresponding sub-topology's number of caches can change.
  • That with the triggering based caches on top of all RocksDB instance for aggregations, we are effectively double caching the records (one cache on top of RocksDB, and one cache inside RocksDB as Memtables). We have this extra caching in objects originally only for reducing serialization costs.

Future Plan Proposal

This should be considered as part of KIP-63.

...

  • We only need in-memory caching if persistent stores are used for aggregates, which will introduce extra serde costs as mentioned above.
  • If the state stores used are already in-memory (and this should be in deserialized objects), we do not need the caching in bytes any more, while we still keep the dirty map for triggered flushing.

 

Deserialized Objects Buffering

The total memory used for this part (denoted as Buffer) can be calculated as:

...

that controls how many records we would buffer before pausing the fetching on that partition, but that 1) does not restrictedly enforce the upper limit on the number of records, and 2) number of deserialized records does not imply #. bytes.

Future Plan Proposal

Assuming that in the future most users will define record timestamps to be the timestamp on the message metadata field, and for the rare case where user's specify a different timestamp extractor we are willing two pay the deserialization cost twice just for getting the source timestamp, then we can keep this buffer in raw bytes as well: i.e. if the default record timestamp extractor is used, we just get the raw bytes records from consumer.poll and extract their timestamps; if other timestamp extractor is used, we deserialize the record to get the timestamp, and throw away the deserialized records but still keep the raw bytes in its buffer. In this case, we can change the config to:

buffered.bytes.per.partition

Persistent State Store Buffering

The total memory used for this part (denoted as Store) as:

Code Block
SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all RocksDB stores in the task} (total #.bytes allocated for this RocksDB)))

 

Future Plan Proposal

For advanced users who have good understandings about RocksDB configs, they should still be able to specify these config values such as 1) block cache size, 2) Memtable buffer size, 3) number of Memtables, 4) etc for a single KafkaStreams instance; and if no user-specified values are provided some default values will be provided. BUT for some of these configs like block cache size, it should be a per Kafka Streams instance config instead of a per RocksDB config, and hence the Streams library should divide its values among the threads / tasks / RocksDB instances dynamically.

And also as a side note, if we are using bytes in our own caching layer as proposed above, then we should try to reduce the usage of RocksDB's own Memtable by default as it effectively have less benefits additionally.

 

Summary

So putting it all together, here is the proposal of Kafka Streams to reason about its memory usage:

...