Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionVoting in progress"

Discussion thread: here [TODO: Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Independently of the Materialization strategy provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in-memory", it will use RocksDB under-the-hood for this internal subscription-store.

A few problems that this behaviour causes:

  • IT Tests: Having an implicit materialization method for state-store affects tests using foreign key state-stores. Windows based systems suffer from the bug described here. The bug is caused by the RocksDB filesystem failing on cleanup step. A work-around to avoid the bug is to use in-memory state-stores in integration tests (rather than exception swallowing). Having the intermediate RocksDB storage being created disregarding Materialization method forces any IT test to necessarily use the manual FS deletion with exception swallowing hack.
  • Short lived stream applications: KTables (or the application running Kafka streams) can be short lived in a way that neither persistent storage nor change-logs creation are desired. The current implementation prevents this kind of behaviour.
  • Breach on dependency inversion design: Forcing RocksDB intermediate storage seems to go against one of the goals of Kafka Stream on being agnostic of the persistence method - Kafka stream currently does provide just a few out-of-the-box persistence methods (e.g. RocksDB/in-memory) but seems to be designed to support multiple instead of necessarily favouring a given one - it seems that the KeyValueStore interface is declared exactly because of Interface Segregation Principle.
  • Lack of fine tuning capabilities: Due to distinct access patterns between a subscription-store and a table-store, a user may want to apply different implementations or configurations to it.

Public Interfaces

The following public interface will be changed, adding 4 methods with extra arguments for join/left join on foreign key and deprecating the equivalent method calls with a single materialization option:

  • org/apache/kafka/streams/kstream/KTable

Proposed Changes

As discussed in the mail list the change is to add new feature/API methods and deprecate old methods.

New feature:

The new feature proposal consists on adding the following methods on KTable interface (and implement implementing them in KtableImpl.java):

...

                                    final MaterializedSubscription<KMaterialized<KO, V, KeyValueStore<Bytes,byte[]>> subscriptionStoreMaterialization>  subscriptionStoreMaterialization);


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

...

                                    final MaterializedSubscription<KMaterialized<KO, V, KeyValueStore<Bytes,byte[]>> > subscriptionStoreMaterialization);

...

                                final MaterializedSubscription<KMaterialized<KO, V, KeyValueStore<Bytes,byte[]>> > subscriptionStoreMaterialization);

...

                               final MaterializedSubscription<KMaterialized<KO, V, KeyValueStore<Bytes,byte[]>> > subscriptionStoreMaterialization);


As

...

discussed in the

...

mail-list, the reason to allow this kind of fine-grain tuning are:

  1. The number or size of records is very large, but the join cardinality is low: In this scenario, some users may prefer to have an on-disk table-store with an in-memory subscription-store.
  2. The number or size of records in each partition of both tables is small(ish): In this scenario, a user may prefer to have an in-memory subscription-store. The cardinality of a subscription-store keys is the cardinality of FK from the specified input table. In this case, even if the cardinality is high (on an all distinct 1-1 mapping) the number of entries in a subscription-store will still be small. One row will always map to only one Foreign Key.
  3. The user might want a different type (or differently configured) store for the subscription-store: This would be for users fine-tuning the access pattern of the store.

As previously discussed in the mail list:

  • Some fields from Materialized are not used by subscription-stores. These fields are Retention and Key/Value Serdes. The underlying store type from a subscription-store is "KeyValueStore<Bytes,byte[]>".
  • Even though this fields are not used, in order to make the API surface smaller, it was decided to use Materialized class as a parameter for subscription-stores. Information about the field not being used in the case of subscription-stores is going to be added as part of the documentation.
  • Documentation about subscription stores needs to be added to Kafka-streams docs.

API methods deprecation

The following method calls will be deprecated, since a user that is providing the materialization type of a table-store will likely also be deciding on the materialization type of a  subscriptions-store

<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Named named,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


<VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,

                                    final Function<V, KO> foreignKeyExtractor,

                                    final ValueJoiner<V, VO, VR> joiner,

                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Named named,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);


<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,

                                final Function<V, KO> foreignKeyExtractor,

                                final ValueJoiner<V, VO, VR> joiner,

                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

  • Only some fields of Materialized should be modifiable in subscription store. Those are: loggingEnabled, cachingEnabled and storeSupplier.
  • Using the materialized parameter persistence type for subscription type (piggy back) may limit the desired customisation possibilities from the user.

Side notes:

I have investigated the class MaterializedInternal - but seems to me that the class has a bit too many fields for what is required on the ticket. I am also a bit concerned to use inheritance for MaterializedSubscription to be a subclass of Materialized. It seems that if the implementation goes this way, Liskov’s substitution principle would be breached (since only a few fields make sense). But I am still considering it. It may be better to make another super class independent of Materialized class. I still don't have a strong opinion on this.

Compatibility, Deprecation, and Migration Plan

It’s an addition to the API instead of a change on a previous API section. Any other change is likely going to be in private methods - not breaking API compatibility and keeping behaviour. The previous opinionated scenario can be kept for the API calls that provide no subscription store. I believe that, deprecating that API at this point is not interesting - since the new API would only be required in really fine grain scenarios. Keeping the old API implementation as it is, however, would still enforce an opinionated view - creating RocksDB subscription stores even for in-memory materialization. If this detail is fine, the implementation of those methods don't need to be revisited.

Rejected Alternatives

Four API methods will be deprecated and replaced for four new equivalent methods.

After the deprecation period expires, to use newer versions of kafka-streams, users who use those deprecated methods need to update their code-base.

In the scenario where users of the old API method need to fine-tune the materialization of the subscription store, they will have to update their code-base to the new API instead. This will keep the behaviour for users of the old API consistent until deprecation. 

Rejected Alternatives

  • Only piggybacking Piggy back on the Materialization provided by the materialized parameter. It would limit the desired customisation possibilities to the user.
  • Creating a specific class for subscription-stores Materialization - it would increase the API footprint without bringing and code complexity without a good trade-off.
  • Not deprecating the previous API. As agreed on the discussion, a user that is concerned about the materialization method of a table-store is likely concerned with the materialization method of subscription store. 
  • Applying Bug-fix to the old API. Since the old API call is being deprecated, users that would be interested on avoiding the opinionated storage should instead use the new API.