Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. As we know, currently there are 2 different types of state store: key-value based and window based. The current interface could only accept key-value store, which is not ideal. There are certain cases we need to materialize a windowed topic (or we call changlog topic) created by another Stream application into local store. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation of a windowed KTable.

Here comes the tricky part: when building this API, in the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). So there are actually . Note that this is different from a normal topic as the serdes required here should be windowed serdes. Let's clear the four different cases involved in this KIPthe discussion

  1. Non-windowed topic materialized to key-value store. This is the most common case and has already been covered by table() API API. 
  2. Non-windowed topic materialized to window store. This is a fallacious requirement because we could easily use aggregate() API to generate a window store based on non-windowed topic.
  3. Windowed topic (stream changelog) materialized to key-value store. This is also a rare requirement to discuss, because the natural difference between key-value store and window store is that window store sets a retention of the data. By materializing windowed topic to key-value we lost the control on the TTL, which would not be the correct leads to wrong representation of the changlog data.
  4. Windowed topic  (stream changelog) materialized to window store. This is a missing requirement which needs to be addressed by our new API. Currently it's very hard to share a changlog between stream applications, and it could be really useful to share the same state store across applications by this API.


Public Interfaces

The current KTable API looks like:

...

Proposed Changes

We would like to add 4 2 new APIs to support window store as underlying storage option.

Code Block
languagejava
titleStreamsBuilder.java
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<Windowed<K>, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

One thing user needs to notice is how to pass in Serde. The type for consumed struct is Consumed<Windowed<K>, V>, because we need to be able to deserialize struct as windowed key and value; The type for materialized however, was Materialized<K, V, WindowStore<Bytes, byte[]>> because the window store needs to store raw key instead of windowed key. By strict type enforcement, user would be alerted at compile time if they confuse the two.

We don't support windowedTable function without `materialized`. The reason is that the window store requires a concrete retention time, window size and number of rolling segments to construct. On the application side Stream job could not infer the windowed topic retention or window size, so these are required information from the user.

Compatibility, Deprecation, and Migration Plan

...