Current state: Under Discussion
Discussion thread: Not available now
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. Sometimes we have certain requirement to materialize a windowed topic (or changlog topic) created by another Stream application into local store. The current interface could only accept key-value store, which is not ideal. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation of a windowed KTable.
Here comes the tricky part: when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). Note that this is different from a normal topic as the serdes required here should be windowed serdes. Let's clear the four different cases involved in the discussion:
Public Interfaces
The current KTable API looks like:
public synchronized <K, V> KTable<K, V> table(final String topic); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); |
Through Materialized struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a windowed topic with windowed key.
We would like to add one new API to support window store as underlying storage option for windowed topic.
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
One thing user needs to notice is how to pass in Serde. The type for consumed struct is Consumed<Windowed<K>, V>, because we need to be able to deserialize struct as windowed key and value; The type for materialized however, was Materialized<K, V, WindowStore<Bytes, byte[]>> because the window store needs to store raw key instead of windowed key. By strict type enforcement, user would be alerted at compile time if they confuse the two.
We don't support windowedTable function without `materialized` or `consumed`.
The reason to keep `materialized` is that the window store requires a concrete retention time, window size and number of rolling segments to construct. On the application side Stream job could not infer the windowed topic retention or window size, so these are required information from the user.
The reason to keep `consumed` is that in the table() API, the reason we could omit `consumed` or `materialized` is that they could share the `keySerde` and `valueSerde` during state store and node construction. This in the windowed table context, however, is not true, since we are using both windowed keySerde and raw keySerde at the same time. So both structs are required.
This KIP will not change the existing table() API, which should be backward compatible.
We start by changing the store type on the table API to support window store:
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
However, this straightfoward solution hits 2 problems:
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String queryableStoreName); |
So we could see that if we return KTable<K, V> in the above table API for window store, we are introducing inconsistent API to the outside user. By defining the output as KTable<Windowed<K>, V> the user could be clear that we are using window store in the underlying implementation.