Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. As we know, currently there are 2 different types of state store: key-value based and window based. The current interface could only accept key-value store, which is not ideal. There are certain cases we need to materialize a windowed topic (or we call changlog topic) created by another Stream application into local store. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation of a windowed KTable.

Here comes the tricky part: in Kafka Streams, sometimes we may encounter the requirement of materializing a windowed topic. In when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). So our discussion could actually extend to four cases:there are actually four different cases involved in this KIP: 

  1. Non-windowed topic materialized to key-value store (. This is the most common case and has already been covered by table() API 
  2. Non-windowed topic materialized to window storeThis is a fallacious requirement because we could easily use aggregate() API to generate window store.
  3. Windowed topic (stream changelog) materialized to key-value store (rare case, but possible)
  4. Windowed topic materialized to window store

...

  1. This is also a rare requirement to discuss, because the natural difference between key-value store and window store is that window store sets a retention of the data. By materializing windowed topic to key-value we lost the control on the TTL, which would not be the correct representation of the data.
  2. Windowed topic  (stream changelog) materialized to window store. This is a missing requirement which needs to be addressed by our new API. Currently it's very hard to share a changlog between stream applications.


Public Interfaces

The current KTable API looks like:

Code Block
languagejava
titleStreamsBuilder.java
public synchronized <K, V> KTable<K, V> table(final String topic);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<KConsumed<Windowed<K>, V> consumed);
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<KMaterialized<Windowed<K>, V, KeyValueStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<KConsumed<Windowed<K>, V> consumed, final Materialized<KMaterialized<Windowed<K>, V, KeyValueStore<Bytes, byte[]>> materialized);

...