Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. Sometimes we have certain requirement to materialize a windowed topic (or changlog topic) created by another Stream application into local store. The current interface could only accept key-value store, which is not ideal. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation new APIs to support the materialization of a windowed KTable, storing as either window store or session store.

Here comes the The tricky part : is that when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). Note that this is different from a normal topic as the serdes required here should be windowed serdes. Let's clear the four different cases involved in the discussion: 

  1. Non-windowed topic materialized to key-value store. This is the most common case and has already been covered by table() API. 
  2. Non-windowed topic materialized to window store. This is a fallacious requirement because we could easily use aggregate() API to generate a window store based on non-windowed topic.
  3. Windowed topic (stream KStream changelog) materialized to key-value store. This is also a rare requirement to discuss, because the natural difference between key-value store and window store is that window store sets a retention of the data. By materializing windowed topic to key-value we lost the control on the TTL, which leads to wrong representation of the changlog data.
  4. Windowed topic  (stream KStream changelog) materialized to window store. This is a missing requirement which needs to be addressed by our new API. Currently it's very hard to share a changlog between stream applications, and it could be really useful to share the same state store across applications by this API.

...

Code Block
languagejava
titleStreamsBuilder.java
public synchronized <K, V> KTable<K, V> table(final String topic);, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<KConsumed<K, V, KeyValueStore<Bytes, byte[]>> materializedV> consumed);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

...

Proposed Changes

We would like to add one eight new API APIs to support window store and session store as underlying storage option for windowed topic.

Code Block
languagejava
titleStreamsBuilder.java
// Window store materialization to KTable
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

// Session store materialization to KTable
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, WindowStore<Bytes, SessionStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

...