Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. Sometimes we have certain requirement to materialize a windowed topic (or changlog topic) created by another Stream application into local store. The current interface could only accept key-value store, which is not ideal. In this KIP, we would like to address this problem by creating new APIs to support the materialization of a windowed KTable, storing as either window store or session store.
The tricky part is that when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). Note that this is different from a normal topic as the serdes required here should be windowed serdes. Let's clear the four different cases involved in the discussion:
Public Interfaces
The current KTable APIs are defined in the StreamsBuilder class:
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> KTable<K, V> table(final String topic); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); |
Through Materialized struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a windowed topic with windowed key.
We would like to add eight new APIs to support window store and session store as underlying storage option for windowed topic.
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) public synchronized <K, V> KTable<K, V> table(final String topic); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized); // New APIs: window store materialization to KTable public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Duration windowSize); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed); public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); // New APIs: session store materialization to KTable public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Duration windowSize); public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed); public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized); |
As the new API suggests, we are tailing from a windowed changelog topic to materialize the data as a KTable of type <Windowed<K>, V> for processing. Internally, the Consumed struct will be converted to <Windowed<K>, V> to correctly deserialize the changelog records. For Materialized the serde type will still be <K, V> because the window store needs raw key serdes and automatically wrapped with windowed key serde (Checkout WindowedKeySchema.toStoreKeyBinary). These details however, are hided from end user. After KIP-393 we have built the constructor which could wrap around a general key serde to make it a window serde, so stream user doesn't need to worry about the type casting, providing raw key serdes should be suffice.
A `windowSize` duration is required to properly initialize the windowed serde. This is because the underlying storage for windowed records are only storing window start timestamp for space efficiency. When using the new API without configuring Consumed or Materialized, it is required to explicitly pass in the window size. This means user must be aware of the windowed topic window size in order to properly deserialize the topic.
As we are talking, the Consumed and Materialized classes will also be added with windowSize type.
public class Consumed<K, V> { ... protected Duration windowSize; // new ... public static <K, V> Consumed<K, V> with(final Duration windowSize); // new } |
public class Materialized<K, V, S> { ... protected Duration windowSize; // new ... public static <K, V, S> Materialized<K, V, S> withWindowSize(final Duration windowSize); // new } |
So when both Materialized and Consumed are equipped with `windowSize`, the Consumed `windowSize` will take precedence, as it is the primary owner for deserializing the input topic.
This KIP will not change the existing table() API, which should be backward compatible.
We start by changing the store type on the table API to support window store:
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
However, this straightfoward solution hits 2 problems:
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String queryableStoreName); |
So we could see that if we return KTable<K, V> in the above table API for window store, we are introducing inconsistent API to the outside user. By defining the output as KTable<Windowed<K>, V> the user could be clear that we are using window store in the underlying implementation.