You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current stateUnder Discussion

Discussion thread: Not available now

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We have an existing table() API in the StreamsBuilder which could materialize a Kafka topic into a local state store called KTable. As we know, currently there are 2 different types of state store: key-value based and window based. The current interface could only accept key-value store, which is not ideal. In this KIP, we would like to address this problem by creating a new API called windowedTable() which supports the generation of a windowed KTable.

Here comes the tricky part: in Kafka Streams, sometimes we may encounter the requirement of materializing a windowed topic. In the source processor point of view, the windowed topic input should be (Windowed<K> key, V value). So our discussion could actually extend to four cases:

  1. Non-windowed topic materialized to key-value store (the most common case)
  2. Non-windowed topic materialized to window store
  3. Windowed topic materialized to key-value store (rare case, but possible)
  4. Windowed topic materialized to window store

Here we want to be clear about case 2 and 4. When we materialize a windowed topic


Public Interfaces

The current KTable API looks like:

StreamsBuilder.java
public synchronized <K, V> KTable<K, V> table(final String topic);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<K, V> table(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

Through Materialized struct, we could pass in a KeyValueStore<Bytes, byte[]> struct as the local state store. In fact, underlying KTable class by default stores data in a key-value store backed up by RocksDB. We want to also support window store which is a very natural requirement if we are materializing a topic with windowed key.

Proposed Changes

We would like to add 4 new APIs to support window store as underlying storage option.

StreamsBuilder.java
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
public synchronized <K, V> KTable<Windowed<K>, V> windowedTable(final String topic, final Consumed<K, V> consumed, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

Compatibility, Deprecation, and Migration Plan

This KIP will not change the existing table() API, which should be backward compatible.

Rejected Alternatives

We start by changing the store type on the table API to support window store:

StreamsBuilder.java
public synchronized <K, V> KTable<K, V> table(final String topic, final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);

However, this straightfoward solution hits 2 problems:

  1. The store type could not be changed due to Java "method has same erasure" error
  2. Even if we name the API to windowedTable, it is still not ideal because we saw certain KTable return type in other classes such as in KGroupedStream:

KGroupedStream.java
<W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows, final String queryableStoreName);

So we could see that if we return KTable<K, V> in the above table API for window store, we are introducing inconsistent API to the outside user. By defining the output as KTable<Windowed<K>, V> the user could be clear that we are using window store in the underlying implementation.

  • No labels