...
We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. As we know, currently there are 2 different types of state store: key-value based and window based. The current interface could only accept key-value store, which is not ideal. There are certain cases we need to materialize a windowed topic (or we call changlog topic) created by another Stream application into local store. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation of a windowed KTable.
Here comes the tricky part: in Kafka Streams, sometimes we may encounter the requirement of materializing a windowed topic. In when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). So our discussion could actually extend to four cases:there are actually four different cases involved in this KIP:
- Non-windowed topic materialized to key-value store (. This is the most common case and has already been covered by table() API
- Non-windowed topic materialized to window store. This is a fallacious requirement because we could easily use aggregate() API to generate window store.
- Windowed topic (stream changelog) materialized to key-value store (rare case, but possible)
- Windowed topic materialized to window store
...
- . This is also a rare requirement to discuss, because the natural difference between key-value store and window store is that window store sets a retention of the data. By materializing windowed topic to key-value we lost the control on the TTL, which would not be the correct representation of the data.
- Windowed topic (stream changelog) materialized to window store. This is a missing requirement which needs to be addressed by our new API. Currently it's very hard to share a changlog between stream applications.
Public Interfaces
The current KTable API looks like:
Code Block | ||||
---|---|---|---|---|
| ||||
public synchronized <K, V> KTable<K, V> table(final String topic); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<KConsumed<Windowed<K>, V> consumed); public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<KMaterialized<Windowed<K>, V, KeyValueStore<Bytes, byte[]>> materialized); public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<KConsumed<Windowed<K>, V> consumed, final Materialized<KMaterialized<Windowed<K>, V, KeyValueStore<Bytes, byte[]>> materialized); |
...