Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleStreamsBuilder.java
// New APIs: window store materialization to KTable
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, Duration windowSize, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Duration windowSize);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, Duration windowSize, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, Duration windowSize, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

// New APIs: session store materialization to KTable
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>Consumed<K, V> consumed, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Duration windowSize);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Consumed<Windowed<K>Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> sessionTable(final String topic, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

...

A `windowSize` duration is required to properly initialize the time windowed serde. This is because the underlying storage for windowed records are only storing window start timestamp for space efficiency. When using the new API without configuring Consumed or Materialized, it is required to explicitly pass in the window size. This means user must be aware of the windowed topic window size in order to properly deserialize the topic.

As we are talking, the Consumed and Materialized classes will also be added with windowSize type.

Code Block
languagejava
titleConsumed.java
public class Consumed<K, V> {
	...
	protected Duration windowSize; // new
	...
	public static <K, V> Consumed<K, V> with(final Duration windowSize); // new
}
Code Block
languagejava
titleConsumed.java
public class Materialized<K, V, S> {
	...
	protected Duration windowSize; // new
	...
	public static <K, V, S> Materialized<K, V, S> withWindowSize(final Duration windowSize); // new
}

So when both Materialized and Consumed are equipped with `windowSize`, the Consumed `windowSize` will take precedence, as it is the primary owner for deserializing the input topicFor session window serde, `windowSize` config is not needed, because we don't know the individual window size beforehand, so each record will store both start and end time.

Compatibility, Deprecation, and Migration Plan

...