Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. This interface is very useful when we want to back up a Kafka topic to local store. Sometimes we have certain requirement to materialize a windowed topic (or changlog topic) created by another Stream application into local store, tooThe current interface could only accept key-value store, which is not ideal. In this KIP, we would like to address this problem by creating new APIs to support the materialization of a windowed KTable, storing as either window store or session store.

...

Code Block
languagejava
titleStreamsBuilder.java
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized)
public synchronized <K, V> KTable<K, V> table(final String topic);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

Through Materialized `Materialized` struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a windowed topic with windowed key.

Proposed Changes

We would like to add eight 8 new APIs to support window store and session store as underlying storage option for windowed topic.

Code Block
languagejava
titleStreamsBuilder.java
// New APIs: window store materialization to time windowed KTable
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Duration windowSize, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Duration windowSize);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Duration windowSize, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Duration windowSize, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

// New APIs: session store materialization to session windowed KTable
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

...

A `windowSize` duration is required to properly initialize the time windowed serde. This is because the underlying storage for windowed records are only storing window start timestamp for space efficiency. When using the new time windowed API without configuring Consumed or Materialized, it is required to explicitly pass in the positive window size for initialization. This means user User must be aware of the windowed topic window size in order to properly deserialize the topic. For session window serde, `windowSize` config is not needed, because we don't know the individual window size beforehand, so each record will store both start and end time.

...