Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here 

JIRA: here

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...




The cache has two functions. First, it continues to serve as a read buffer for data that is sent to the state store, just like today. Second, it serves as a write deduplicator for the state store (just like today) as well as for the downstream processor node(s). So the tradeoff is "getting each update, i.e., a low update delay -- but a large CPU and storage load" vs. "skipping some updates, i.e., larger update delay -- but a reduced resource load". Note that this optimization does not affect correctness. The optimization is applicable to aggregations and . It is not applicable to other operators like joins.

The proposal calls for one parameter to be added to the streams configuration options:

  1. cache.max.bytes.buffering: This parameter controls the global cache size. Specifically, for a streams instance with T threads and cache size C, each thread will have an even C/T bytes of cache, to use as it sees fit among its tasks. No sharing of caches across threads will happen. Note that the cache serves for reads and writes. Records are evicted using a simple LRU scheme once the cache size is reached. The first time a keyed record R1 = <K1, V1> finishes processing at a node, it is marked as dirty in the cache. Any other keyed record R2 = <K1, V2> with the same key K1 that is processed on that node during that time will overwrite <K1, V1>.  Upon flushing R2 is i) forwarded to the next processing node and ii) written to RocksDB (one write is local, one is to a backing Kafka topic). Note that if cache.max.bytes.buffering set to X, and if users have A aggregation operators and T KTable.to() operators, then X*(A + T) bytes will be allocated for caching.

The semantics of this parameter is that data is forwarded and flushed whenever the earliest of commit.interval.ms (note this parameter already exists and specifies the frequency with which a processor flushes its state) or cache pressure hits. Both parameters specify per-processor buffering time and buffering size. They These are global parameters in the sense that they apply to all processor nodes in the topology, i.e., it will not be possible to specify different parameters for each node. Such fine grained control is probably not needed from the DSL. 

Continuing with the motivating example above, what is forwarded for K1, assuming all three operations hit in cache, is a final Change value <V1 + V10 + V100, null>.

For the low-level Processor API, we propose no changes, i.e., this KIP does not apply to the Processor API, just the DSL.

Public Interfaces

This KIP adds the cache.max.bytes.buffering configuration to the streams configuration as described above. 

...

  1. Change the RocksDB implementation for KStream windowed aggregations, to not use “range queries” but multiple gets, so that we can leverage caches for it as wellmerge range queries from the cache with range queries from RocksDb.

  2. Extract the LRU cache out of RocksDBStore, as a separate store for KGroupedStream.aggregate() / reduce(), and KStream.aggregateByKey() / reduceByKey().Add the LRU cache for KTable.to() operator.. Forward entires downstream on flush or evict.

  3. Adjust context forwarding logic so that forwarding happens on cache flush

  4. Add the above config into StreamsConfig

     

...

The advantage of this parameter is that it could be smaller than the existing commit.interval.ms, and thus allow for smaller time intervals spent buffering. However, if desired, commit.interval.ms could also be set to a smaller value, so it is unclear what the benefit of yet another additional timeout would be.

- Expose this caching to the Processor API: 

For the low-level Processor API, we could allow users to enable/disabling caching on a store-by-store basis using a new .enableCaching() call. For example:

 

TopologyBuilder builder = new TopologyBuilder();

 

builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().persistent().enableCaching().build(), "Process");

 

The implication of adding caching to the processor API is that the caching layer will also do forwarding. Hence if a store's cache is enabled, the use of context.forward would not needed anymore. However, this poses the problem of a user having to change their code (to use context.forward) each time they want to enable or disable caching. As such, more thought is needed for this approach. Furthermore, there are several other ways of doing caching in the processor API and perhaps we do not want to prescribe this one as the only way.