Current state: Under Discussion
Discussion thread: Not available now
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. As we know, currently there are 2 different types of state store: key-value based and window based. The current interface could only accept key-value store, which is not ideal. There are certain cases we need to materialize a windowed topic (or we call changlog topic) created by another Stream application into local store. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation of a windowed KTable.
Here comes the tricky part: when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). So there are actually four different cases involved in this KIP:
Public Interfaces
The current KTable API looks like:
public synchronized <K, V> KTable<K, V> table(final String topic); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); |
Through Materialized struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a windowed topic with windowed key.
We would like to add 4 new APIs to support window store as underlying storage option.
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<Windowed<K>, V> consumed); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Materialized<Windowed<K>, V, WindowStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<Windowed<K>, V, WindowStore<Bytes, byte[]>> materialized); |
This KIP will not change the existing table() API, which should be backward compatible.
We start by changing the store type on the table API to support window store:
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
However, this straightfoward solution hits 2 problems:
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String queryableStoreName); |
So we could see that if we return KTable<K, V> in the above table API for window store, we are introducing inconsistent API to the outside user. By defining the output as KTable<Windowed<K>, V> the user could be clear that we are using window store in the underlying implementation.