You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

Current state"Voting in progress"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current implementation of KIP-213 for Foreign Key Join between two KTables is opinionated in terms of intermediate (subscription) store persistence type.

Independently of the Materialization strategy provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in-memory", it will use RocksDB under-the-hood for this internal subscription-store.

A few problems that this behaviour causes:

  • IT Tests: Having an implicit materialization method for state-store affects tests using foreign key state-stores. Windows based systems suffer from the bug described here. The bug is caused by the RocksDB filesystem failing on cleanup step. A work-around to avoid the bug is to use in-memory state-stores in integration tests (rather than exception swallowing). Having the intermediate RocksDB storage being created disregarding Materialization method forces any IT test to necessarily use the manual FS deletion with exception swallowing hack.
  • Short lived stream applications: KTables (or the application running Kafka streams) can be short lived in a way that neither persistent storage nor change-logs creation are desired. The current implementation prevents this kind of behaviour.
  • Breach on dependency inversion design: Forcing RocksDB intermediate storage seems to go against one of the goals of Kafka Stream on being agnostic of the persistence method - Kafka stream currently does provide just a few out-of-the-box persistence methods (e.g. RocksDB/in-memory) but seems to be designed to support multiple instead of necessarily favouring a given one - it seems that the KeyValueStore interface is declared exactly because of Interface Segregation Principle.
  • Lack of fine tuning capabilities: Due to distinct access patterns between a subscription-store and a table-store, a user may want to apply different implementations or configurations to it.

Public Interfaces

The following public interface will be changed, adding methods with extra arguments for join on foreign key:

  • org/apache/kafka/streams/kstream/KTable

Proposed Changes

As discussed in the mail list, there are two changes: a bug-fix and a new feature.

Bug-fix: 

Whenever invoking methods that perform join on foreign key, the subscription-store created under-the-hood will use the same materialization method as parameter provided as "materialized".

This means that, if the materialized parameter has a in-memory store, the subscriptions store will also be in-memory.

New Feature:

The new feature proposal consists on adding the following methods on KTable interface (and implementing them in KtableImpl.java):


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Named named,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                    final Materialized<KO, V, KeyValueStore<Bytes,byte[]>  subscriptionStoreMaterialization);



<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                    final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Named named,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                               final Materialized<KO, V, KeyValueStore<Bytes,byte[]> subscriptionStoreMaterialization);


As discussed in the mail-list, the reason to allow this kind of fine-grain tuning are:

  1. The number or size of records is very large, but the join cardinality is low: In this scenario, some users may prefer to have an on-disk table-store with an in-memory subscription-store.
  2. The number or size of records in each partition of both tables is small(ish): In this scenario, a user may prefer to have an in-memory subscription-store. The cardinality of a subscription-store keys is the cardinality of FK from the specified input table. In this case, even if the cardinality is high (on an all distinct 1-1 mapping) the number of entries in a subscription-store will still be small. One row will always map to only one Foreign Key.
  3. The user might want a different type (or differently configured) store for the subscription-store: This would be for users fine-tuning the access pattern of the store.

As previously discussed in the mail list:

  • Some fields from Materialized are not used by subscription-stores. These fields are Retention and Key/Value Serdes. The underlying store type from a subscription-store is "KeyValueStore<Bytes,byte[]>".
  • Even though this fields are not used, in order to make the API surface smaller, it was decided to use Materialized class as a parameter for subscription-stores. Information about the field not being used in the case of subscription-stores is going to be added as part of the documentation.

Compatibility, Deprecation, and Migration Plan

Bug-fix change:

The bug-fix modification may change legacy code behaviour when updated to newer version of Kafka streams, specially when the materialized parameter is in-memory and the subscription-store grows too big (since it will not be stored in disk anymore as the old API did, but in-memory too instead). This scenario would require to add comments about this change of behaviour in the update documentation and perform it in a non-minor release.

New Feature (API new methods):

It’s an addition to the API instead of a change on a previous API section. Any other change is likely going to be in private methods - not breaking API compatibility and keeping behaviour.

Rejected Alternatives

  • Only piggybacking on the Materialization provided by the materialized parameter. It would limit the desired customisation possibilities to the user.
  • Creating a specific class for subscription-stores Materialization - it would increase the API footprint without bringing and code complexity without a good trade-off.
  • No labels