...
We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. Sometimes we have certain requirement to materialize a windowed topic (or changlog topic) created by another Stream application into local store. The current interface could only accept key-value store, which is not ideal. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation new APIs to support the materialization of a windowed KTable, storing as either window store or session store.
Here comes the The tricky part : is that when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). Note that this is different from a normal topic as the serdes required here should be windowed serdes. Let's clear the four different cases involved in the discussion:
- Non-windowed topic materialized to key-value store. This is the most common case and has already been covered by table() API.
- Non-windowed topic materialized to window store. This is a fallacious requirement because we could easily use aggregate() API to generate a window store based on non-windowed topic.
- Windowed topic (stream KStream changelog) materialized to key-value store. This is also a rare requirement to discuss, because the natural difference between key-value store and window store is that window store sets a retention of the data. By materializing windowed topic to key-value we lost the control on the TTL, which leads to wrong representation of the changlog data.
- Windowed topic (stream KStream changelog) materialized to window store. This is a missing requirement which needs to be addressed by our new API. Currently it's very hard to share a changlog between stream applications, and it could be really useful to share the same state store across applications by this API.
...
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> KTable<K, V> table(final String topic);, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<KConsumed<K, V, KeyValueStore<Bytes, byte[]>> materializedV> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); |
...
Proposed Changes
We would like to add one eight new API APIs to support window store and session store as underlying storage option for windowed topic.
Code Block | ||||
---|---|---|---|---|
| ||||
// Window store materialization to KTable public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); // Session store materialization to KTable public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, WindowStore<Bytes, SessionStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic); public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed); public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); |
...