Status

Current state"Voting in progress"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current implementation of KIP-213 for Foreign Key Join between two KTables is opinionated in terms of intermediate (subscription) store persistence type.

Independently of the Materialization strategy provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in-memory", it will use RocksDB under-the-hood for this internal subscription-store.

A few problems that this behaviour causes:

Public Interfaces

The following public interface will be changed, adding 4 methods with extra arguments for join/left join on foreign key and deprecating the equivalent method calls with a single materialization option:

Proposed Changes

As discussed in the mail list the change is to add new feature/API methods and deprecate old methods.

New feature:

The new feature proposal consists on adding the following methods on KTable interface (and implementing them in KtableImpl.java):


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Named named,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                    final Materialized<KO, V, KeyValueStore<Bytes,byte[]>  subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                    final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Named named,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                               final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);


As discussed in the mail-list, the reason to allow this kind of fine-grain tuning are:

  1. The number or size of records is very large, but the join cardinality is low: In this scenario, some users may prefer to have an on-disk table-store with an in-memory subscription-store.
  2. The number or size of records in each partition of both tables is small(ish): In this scenario, a user may prefer to have an in-memory subscription-store. The cardinality of a subscription-store keys is the cardinality of FK from the specified input table. In this case, even if the cardinality is high (on an all distinct 1-1 mapping) the number of entries in a subscription-store will still be small. One row will always map to only one Foreign Key.
  3. The user might want a different type (or differently configured) store for the subscription-store: This would be for users fine-tuning the access pattern of the store.

As previously discussed in the mail list:

API methods deprecation

The following method calls will be deprecated, since a user that is providing the materialization type of a table-store will likely also be deciding on the materialization type of a  subscriptions-store

<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Named named,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Named named,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Compatibility, Deprecation, and Migration Plan

Four API methods will be deprecated and replaced for four new equivalent methods.

After the deprecation period expires, to use newer versions of kafka-streams, users who use those deprecated methods need to update their code-base.

In the scenario where users of the old API method need to fine-tune the materialization of the subscription store, they will have to update their code-base to the new API instead. This will keep the behaviour for users of the old API consistent until deprecation. 

Rejected Alternatives