Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Non-windowed topic materialized to key-value store. This is the most common case and has already been covered by table() API. 
  2. Non-windowed topic materialized to window store. This is a fallacious requirement because we could easily use aggregate() API to generate a window store based on non-windowed topic.
  3. Windowed topic (KStream changelog) materialized to key-value store. This is also a rare requirement to discuss, because the natural difference between key-value store and window store is that window store sets a retention of the data. By materializing windowed topic to key-value we lost the control on the TTL, which leads to wrong representation of the changlog data.
  4. Windowed topic  (KStream changelog) materialized to window store. This is a missing requirement which needs to be addressed by our new API. Currently it's very hard to share a changlog between stream applications, and it could be really useful to share the same state store across applications by this API.


Public Interfaces

The current KTable API looks like:

We would like to add eight new APIs to support window store and session store as underlying storage option for windowed topic.

Code Block
languagejava
titleStreamsBuilder.java
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
public synchronized <K, V> KTable<K, V> table(final String topic);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

Through Materialized struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a windowed topic with windowed key.

Proposed Changes

We would like to add eight new APIs to support window store and session store as underlying storage option for windowed topic.

Code Block
languagejava
titleStreamsBuilder.java
// Window


// New APIs: window store materialization to KTable
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

// New APIs: Sessionsession store materialization to KTable
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

Through Materialized struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a windowed topic with windowed key.

Proposed Changes

As the new API suggests, we are tailing from a windowed changelog topic to materialize the data for processing. Internally, the Consumed type will be converted to <Windowed<K>, V> to correctly deserialize the changelog records. For Materialized the serde type will still be <K, V> One thing user needs to notice is how to pass in Serde. The type for consumed struct is Consumed<Windowed<K>, V>, because we need to be able to deserialize struct as windowed key and value; The type for materialized, however, was Materialized<K, V, WindowStore<Bytes, byte[]>> because the window store needs to store raw key instead of windowed key. By strict type enforcement, user would be alerted at compile time if they confuse the two.

Why don't we support windowedTable function without `materialized` or `consumed`, just like table()?

The reason to keep `materialized`,  is that the window store requires a concrete retention time, window size and number of rolling segments to properly setup. On the application side the stream job could not infer the windowed topic retention or window size, so these are required information from the user. 

The reason to keep `consumed` , is that in the table() API,  we could replace `materialized` serde with `consumed` serde because they could share the `keySerde` and `valueSerde` during state store and node construction. In the windowed table context, however, this will not work, since we are using both windowed keySerde and raw keySerde at the same time. So both structs are required.

One side effect is that we bring `ChangeLoggingWindowBytesStore` public for unit test purposeThese details however, are hided from end user. After KIP-393 we have built the constructor which could wrap around a general key serde to make it a window serde, so KStream user doesn't need to worry about the type casting.

Compatibility, Deprecation, and Migration Plan

...