Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Kafka Producer: each thread of a Kafka Streams instance maintains a producer client. The client itself maintains a buffer for batching records that are going to be sent to Kafka. This is completely controllable by producer's
    buffer.memory config.
     
  2. Kafka Consumer: each thread of a Kafka Streams instance maintains two consumer clients, one for normal data fetching and another one for state store replication and restoration only. Each client buffers fetched messages before they are returned to user from the poll call. Today this buffer is not controllable yet, but in the near future we are going to add similar memory bound controls like we have in producers: KAFKA-2045.

  3. Both producer and consumer also have separate TCP send / receive buffers that are not counted as the buffering memory, which are controlled by the send.buffer.bytes / receive.buffer.bytes configs; these are usually small (100K) and hence neglected most of the time.

  4. Triggering based Caches: as summarized in KIP-63, we will be adding a cache for each of the aggregation and KTable.to operators, and we are adding a StreamsConfig to bound the total number of bytes used for all caches. BUT we are caching them as deserialized objects in order to avoid serialization costs.

  5. Deserialized Objects Buffering: within each thread's running loop, after the records are returned in raw bytes from consumer.poll, the thread will deserialize each one of them into typed objects and buffer them, and process them one record at-a-time. This is mainly used for extracting the timestamps (which may be in the message's value payload) and for reasoning about the streams-time to determine which stream to process next (i.e. synchronizing streams based on their current timestamps, see this for details).
     
  6. Persistent State Store Buffering: currently This is related to KIP-63. Currently we are using RocksDB by default as persistent state stores for stateful operations such as aggregation / joins, and RocksDB have their own buffering and caching mechanism which allocate memory both off-heap and on-heap. And RocksDB has its own configs that controls their sizes (we plan to expose these configs separately from StreamsConfig: KAFKA-3740), to name a few:
    • block_cache_size: amount of cache in bytes that will be used by RocksDB. NOTE this is off-heap.
    • write_buffer_size: the size of a single memtable in RocksDB.
    • max_write_buffer_number: the maximum number of memtables, both active and immutable. 

      So a rough calculation about the amount of memory: block_cache_size + write_buffer_size * max_write_buffer_number.

...