You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current state"Under Discussion"

Discussion thread: here [TODO: Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current implementation of KIP-213 for Foreign Key Join between two KTables is opinionated in terms of intermediate (subscription) store persistence type.

Independently of the Materialization strategy provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in-memory", it will use RocksDB under-the-hood for this internal subscription store.

A few problems that this behaviour causes:

  • IT Tests: Having an implicit materialization method for state-store affects tests using foreign key state-stores. Windows based systems suffer from the bug described here. The bug is caused by the RocksDB filesystem failing on cleanup step. A work-around to avoid the bug is to use in-memory state-stores in integration tests (rather than exception swallowing). Having the intermediate RocksDB storage being created disregarding Materialization method forces any IT test to necessarily use the manual FS deletion with exception swallowing hack.
  • Short lived stream applications: KTables (or the application running Kafka streams) can be short lived in a way that neither persistent storage nor change-logs creation are desired. The current implementation prevents this kind of behaviour.
  • Breach on dependency inversion design: Forcing RocksDB intermediate storage seems to go against one of the goals of Kafka Stream on being agnostic of the persistence method - Kafka stream currently does provide just a few out-of-the-box persistence methods (e.g. RocksDB/in-memory) but seems to be designed to support multiple instead of necessarily favouring a given one - it seems that the KeyValueStore interface is declared exactly because of Interface Segregation Principle.

Public Interfaces

The following public interface will be changed, adding methods with extra arguments for join on foreign key:

  • org/apache/kafka/streams/kstream/KTable

Proposed Changes

The proposal consists on adding the following methods on KTable interface (and implement them in KtableImpl.java):


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Named named,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                    final MaterializedSubscription<K, KeyValueStore<Bytes, byte[]>> subscriptionStoreMaterialization);



<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                    final MaterializedSubscription<K, KeyValueStore<Bytes, byte[]>> subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Named named,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                                final MaterializedSubscription<K, KeyValueStore<Bytes, byte[]>> subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,

                               final MaterializedSubscription<K, KeyValueStore<Bytes, byte[]>> subscriptionStoreMaterialization);


As mentioned in the ticket:

  • Only some fields of Materialized should be modifiable in subscription store. Those are: loggingEnabled, cachingEnabled and storeSupplier.
  • Using the materialized parameter persistence type for subscription type (piggy back) may limit the desired customisation possibilities from the user.


Side notes:

I have investigated the class MaterializedInternal - but seems to me that the class has a bit too many fields for what is required on the ticket. I am also a bit concerned to use inheritance for MaterializedSubscription to be a subclass of Materialized. It seems that if the implementation goes this way, Liskov’s substitution principle would be breached (since only a few fields make sense). But I am still considering it. It may be better to make another super class independent of Materialized class. I still don't have a strong opinion on this.

Compatibility, Deprecation, and Migration Plan

It’s an addition to the API instead of a change on a previous API section. Any other change is likely going to be in private methods - not breaking API compatibility and keeping behaviour. The previous opinionated scenario can be kept for the API calls that provide no subscription store. I believe that, deprecating that API at this point is not interesting - since the new API would only be required in really fine grain scenarios. Keeping the old API implementation as it is, however, would still enforce an opinionated view - creating RocksDB subscription stores even for in-memory materialization. If this detail is fine, the implementation of those methods don't need to be revisited.

Rejected Alternatives

  • Piggy back on the Materialization provided by the materialized parameter. It would limit the desired customisation possibilities to the user.
  • No labels