Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here 

JIRA: here

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...




The cache has two functions. First, it continues to serve as a read buffer for data that is sent to the state store, just like today. Second, it serves as a write deduplicator for the state store (just like today) as well as for the downstream processor node(s). So the tradeoff is "getting each update, i.e., a low update delay -- but a large CPU and storage load" vs. "skipping some updates, i.e., larger update delay -- but a reduced resource load". Note that this optimization does not affect correctness. The optimization is applicable to aggregations and to operators. It is not applicable to other operators like joins.

The proposal calls for one parameter to be added to the streams configuration options:

...

For the low-level Processor API, we propose to allow users for disabling caching on a store-by-store basis using a new .enableCaching() call. For example:

TopologyBuilder builder = new TopologyBuilder();

...

no changes, i.e., this KIP does not apply to the Processor API, just the DSL.

Public Interfaces

This KIP adds the cache.max.bytes.buffering configuration to the streams configuration as described above. This KIP adds the .enableCaching() call to the StoreFactory class for the Processor API only (this does not lead to changes in the DSL).

Proposed implementation outline

  1. Change the RocksDB implementation for KStream windowed aggregations, to merge range queries from the cache with range queries from RocksDb.

  2. Extract the LRU cache out of RocksDBStore, as a separate store for KGroupedStream.aggregate() / reduce(), and KStream.aggregateByKey() / reduceByKey(). Forward entires downstream on flush or evict.Add the LRU cache for KTable.to() operator.

  3. Adjust context forwarding logic so that forwarding happens on cache flush

  4. Add the above config into StreamsConfig

     

...

The advantage of this parameter is that it could be smaller than the existing commit.interval.ms, and thus allow for smaller time intervals spent buffering. However, if desired, commit.interval.ms could also be set to a smaller value, so it is unclear what the benefit of yet another additional timeout would be.

- Expose this caching to the Processor API: 

For the low-level Processor API, we could allow users to enable/disabling caching on a store-by-store basis using a new .enableCaching() call. For example:

 

TopologyBuilder builder = new TopologyBuilder();

 

builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().persistent().enableCaching().build(), "Process");

 

The implication of adding caching to the processor API is that the caching layer will also do forwarding. Hence if a store's cache is enabled, the use of context.forward would not needed anymore. However, this poses the problem of a user having to change their code (to use context.forward) each time they want to enable or disable caching. As such, more thought is needed for this approach. Furthermore, there are several other ways of doing caching in the processor API and perhaps we do not want to prescribe this one as the only way.