Current state: "Voting in progress"
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
The current implementation of KIP-213 for Foreign Key Join between two KTables is opinionated in terms of intermediate (subscription) store persistence type.
Independently of the Materialization strategy provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in-memory", it will use RocksDB under-the-hood for this internal subscription-store.
A few problems that this behaviour causes:
The following public interface will be changed, adding 4 methods with extra arguments for join/left join on foreign key and deprecating the equivalent method calls with a single materialization option:
org/apache/kafka/streams/kstream/KTable
As discussed in the mail list the change is to add new feature/API methods and deprecate old methods.
The new feature proposal consists on adding the following methods on KTable interface (and implementing them in KtableImpl.java):
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);
As discussed in the mail-list, the reason to allow this kind of fine-grain tuning are:
As previously discussed in the mail list:
The following method calls will be deprecated, since a user that is providing the materialization type of a table-store will likely also be deciding on the materialization type of a subscriptions-store
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
final Function<V, KO> foreignKeyExtractor,
final ValueJoiner<V, VO, VR> joiner,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
Four API methods will be deprecated and replaced for four new equivalent methods.
After the deprecation period expires, to use newer versions of kafka-streams, users who use those deprecated methods need to update their code-base.
In the scenario where users of the old API method need to fine-tune the materialization of the subscription store, they will have to update their code-base to the new API instead. This will keep the behaviour for users of the old API consistent until deprecation.