You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

There have been some questions and discussions about how to efficiently let users to configure their memory usage in Kafka Streams since 0.10.0 release, and how that will affect our current development plans regarding caching, buffering, and state store management, etc. In this page we summarize the memory usage background in Kafka Streams as of 0.10.0, and discuss what would be the "end goal" for Kafka Stream's memory management. This is not used as an implementation design and development plan for memory management, but rather as a guidance for related feature developments that may be correlating to the memory usage.

 

Background

There are a few modules inside Kafka Streams that allocate memory during the runtime:

  1. Kafka Producer: each thread of a Kafka Streams instance maintains a producer client. The client itself maintains buffer for batching records that are going to be sent to Kafka. This is completely controllable by producer's
    buffer.memory config.
     
  2. Kafka Consumer: each thread of a Kafka Streams instance maintains two consumer client, one for normal data fetching and one for state store replication and restoration only. Each client maintains buffer fetched messages before they are returned to user from the poll call. Today it is not controllable yet, but in the near future we are going to add similar memory bound controls like we have in producers: KAFKA-2045.

    * Both producer and consumer also has separate TCP send / receive buffers that are not counted as the buffering memory, which are controlled by the
     send.buffer.bytes / receive.buffer.bytes
      configs; these are usually small (100K) and hence neglected most of the time.

  3. Triggering based Caches: as summarized in KIP-63, we will be adding a cache for each of the aggregation and KTable.to operators, and we are adding a StreamsConfig to bound the total number of bytes used for all caches. BUT we are caching them as deserialized objects in order to avoid serialization costs.

  4. Deserialized Objects Buffering: within each thread's running loop, after the records are returned in raw bytes from consumer.poll, the thread will deserialize each one of them into typed objects and buffer them, and process them one record at-a-time. This is mainly used for extracting the timestamps (which may be in the message's value payload) and reason about streams time to determine which stream to process next (i.e. synchronizing streams based on their current timestamps, see this for details).
     
  5. Persistent State Store Buffering: currently we are using RocksDB by default as persistent state stores for stateful operations such as aggregation / joins, and RocksDB have their own buffering and caching mechanism which allocate memory both off-heap and on-heap. And RocksDB has its own configs that controls their sizes (we plan to expose these configs separately from StreamsConfig: KAFKA-3740), to name a few:
    • block_cache_size: amount of cache in bytes that will be used by RocksDB. NOTE this is off-heap.
    • write_buffer_size: the size of a single memtable in RocksDB.
    • max_write_buffer_number: the maximum number of memtables, both active and immutable. 

      So a rough calculation about the amount of memory: block_cache_size + write_buffer_size * max_write_buffer_number.

 

Memory Management: The End Goal

In the ideal world, Kafka Streams should provide very simple configuration for its memory management. More concretely, for example, users should be able to just specifying a couple of configs specifying the memory limit on the Producer, Consumer and Streams client as sth. like:

  • producer.memory.bytes (P), covering 1) above.
  • consumer.memory.bytes (C), covering 2) above.
  • streams.memory.bytes (S), covering 1) + 2) + 3) + 4) + 5) above. Hence the streams library's pure usage (case 3, 4, and 5 above) will be S - P - C.

 

With P and C having some reasonable default values, most of the time users only need to specify S above while keeping in mind that it needs to be at least P + C, and the library is then guaranteed to use less memory than S. And hence if users start their Streams application in a container with bounded memory usage as X, they know that their coded application can use up to the amount of memory allowed by the container minus allocable library usage, i.e. X - S. And even under task migration scenarios upon failures, or rebalancing, the immigrated tasks which will then allocate memory for its own caching and state stores, etc, will not suddenly increase the libraries memory usage since its total is still bounded, and hence not causing OOM (note that in case of task migration due to one instance failure, without memory bounding it may cause cascading OOMs, which is really bad user experience).

 

With this end goal in mind, now let's see how we should bound the memory usage for the above cases, especially 3), 4) and 5). 

 

Triggering based Caches

The total memory used for this part can be calculated as:

SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all caches created within this task's topology} (#.bytes in this cache)))

 

NOTE that:

  • This total is dynamic from rebalance to rebalance since the tasks assigned to each thread, and hence its corresponding sub-topology's number of caches can change.
  • That with the triggering based caches on top of all RocksDB instance for aggregations, we are effectively double caching the records (one cache on top of RocksDB, and one cache inside RocksDB as Memtables). We have this extra caching in objects originally only for reducing serialization costs.

Future Plan Proposal

This should be considered as part of KIP-63.

We know that for deserialized objects, their size in bytes are hard to estimate accurately without serializing them to bytes. Hence we should consider just caching these values in terms of bytes and always pay the serialization costs.

And although we should divide the memory into individual threads, we could consider instead of dividing the total allocable amount of memory within a thread to individual caches from different tasks, we should consider globally allocate memory for all caches, and flush those caches within the current processing task that used most bytes when the memory is exhausted. 

 

Deserialized Objects Buffering

The total memory used for this part can be calculated as:

SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{partitions assigned to the task} (#.bytes buffered for that partition)))

 

Today we have a config 

buffered.records.per.partition

that controls how many records we would buffer before pausing the fetching on that partition, but that 1) does not restrictedly enforce the upper limit on the number of records, and 2) number of deserialized records does not imply #. bytes.

Future Plan Proposal

Assuming that in the future most users define record timestamps to be the timestamp on the message metadata field, and we use a similar high-watermark approach for determining stream time boundary, and for the rare case where user's specify a different timestamp extractor we are willing two pay the deserialization cost twice just for getting the source timestamp, THEN we do not need to de-serialize the records any more, and hence can completely rely on consumer's own buffering of raw bytes. Hence this deserialized objects buffering can be removed completely.

 

Persistent State Store Buffering (RocksDB)

The total memory used for this part as:

SUM_{all threads within an KafkaStreams instance} (SUM_{all tasks allocated to the thread} (SUM_{all RocksDB stores in the task} (total #.bytes allocated for this RocksDB)))

 

Future Plan Proposal

We can still let advanced users to specify different configs for 1) block cache size, 2) Memtable buffer size, 3) number of Memtables, 4) etc for a single KafkaStreams instance, and be aware that the amount of memory for caching will be the total specified amount of memory minus the total amount of memory used by these state stores (thus, they need to be aware of the equation to calculate the TOTAL amount of memory used based on these config values); and when users do not specify these configs we should dynamically set some default values based on the total specified amount of memory to make sure we still have some amount left for caching.

 

And as a side note, if we are using bytes in our own caching layer as proposed above, then we should try to reduce the usage of RocksDB's own Memtable as it effectively have less benefits additionally.

 

 

 

 

  • No labels