Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Cache size is now global.

...

  1. cache.max.bytes.buffering: This parameter controls the global cache size. Specifically, for a streams instance with T threads and cache size C, each thread will have an even C/T bytes of cache, to use as it sees fit among its tasks. No sharing of caches across threads will happen. Note that the cache serves for reads and writes. Records are evicted using a simple LRU scheme once the cache size is reached. The first time a keyed record R1 = <K1, V1> finishes processing at a node, it is marked as dirty in the cache. Any other keyed record R2 = <K1, V2> with the same key K1 that is processed on that node during that time will overwrite <K1, V1>.  Upon flushing R2 is i) forwarded to the next processing node and ii) written to RocksDB (one write is local, one is to a backing Kafka topic). Note that if cache.max.bytes.buffering set to X, and if users have A aggregation operators and T KTable.to() operators, then X*(A + T) bytes will be allocated for caching.

The semantics of this parameter is that data is forwarded and flushed whenever the earliest of commit.interval.ms (note this parameter already exists and specifies the frequency with which a processor flushes its state) or cache pressure hits. Both parameters specify per-processor buffering time and buffering size. They are global parameters in the sense that they apply to all processor nodes in the topology, i.e., it will not be possible to specify different parameters for each node. Such fine grained control is probably not needed.

...