Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Performs updates from both sides of the join.
  • Uses user-specified serdes to perform the prefixScans.
  • Resolves out-of-order processing due to foreignKey changes.
  • Fully scalable with increasing data, provided that one thread doesn't process most of the foreign-key data.

Current Limitations / Points of note:

  • Requires three full materialized State Stores
    • One for the prefixScanning on the left.
    • One for the highwater mark for resolving out-of-order processing of events.
    • One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source.
    • Merging the highwater and final materialized may be possible, but it is unlikely to be useful if we wish for users of this API to be able to specify their own Materialized state store.
  • Caching is disabled on the prefixScan store, same reasons as Jan gave above.
  • ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this. A number of which are currently stubbed out with exceptions. I would like to take any advice on this component with regards to its suitability in this interface.
  • Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).
  • Using hardwired Stream Partitioners. Need a suitable way to access the configured stream partitioner, especially since I don't wish to maintain the Cluster Metadata to do so.

Motivation

Same as above.

Public Interfaces

Design Philosophy

  1. The foreignKeyJoin implementation doesn't require the user to have any knowledge about the inner workings of the join.
  2. All events are returned on the same key as they were provided (K,V in, K,V0 out). This is the same as the regular join function.
  3. Some Serdes are required to be provided by the user, as they are used to handle the CombinedKey serialization / deserialization.
  4. The user can provide a custom partitioner if they so wish. This allows them to not have to rely strictly on the DefaultPartitioner if they have custom partitioning logic.

Current Limitations / Points / Issues of note:

  1. Requires three full materialized State Stores
    1. One for the prefixScanning of the repartitioned CombinedKey events.
    2. One for the highwater mark for resolving out-of-order processing of events.
    3. One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source (see KTableSource).
  2. Merging the highwater and final materialized may be possible, but it is unlikely to be useful if we wish for users of this API to be able to specify their own Materialized state store.
  3. Caching is disabled on the prefixScan store, same reasons as Jan gave above.
  4. ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this. A number of which are currently stubbed out with exceptions. I would like to take any advice on this component with regards to its suitability in this interface.
  5. Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).
  6. Uses RecordHeaders to address out-of-order processing. The added offset and propagate headers used by the foreignKeyJoin do not persist outside of the function, but they may collide with user-specified header keys.
  7. CombinedKeyLeftKeyPartitioner -> uses a copied + pasted implementation of the DefaultPartitioner. Evaluate breaking this out of the DefaultPartitioner Producer into an accessible function.
  8. The large number of variables passed to the KTableKTableForeignKeyJoinNode. Current decision is to leave it as its own node because it doesn't quite fit the patterns of existing nodes. In addition, I am not sure if it each DSL operation should have its own Node type or not.

Motivation

Same as above.

Public Interfaces

Code Block
public <V0, KL, VL, KR, VR> KTable<KL, V0> joinOnForeignKey(final KTable<KR, VR> other,
                                                            final ValueMapper<VL, KR> keyExtractor,
                                                            final ValueJoiner<VL, VR, V0> joiner,
                                                            final Materialized<KL, V0, KeyValueStore<Bytes, byte[]>> materialized,
                                                            final StreamPartitioner<KR,?> foreignKeyPartitioner,
                                                            final Serde<KL> thisKeySerde,
                                                            final Serde<VL> thisValueSerde,
                                                            final Serde<KR> otherKeySerde,
                                                            final Serde<V0> joinedValueSerde) {

public 
Code Block
<V0, KL, VL, KR, VR> KTable<KL, V0> foreignKeyJoinjoinOnForeignKey(final KTable<KR, VR> other,
                                                            final ValueMapper<VL, KR> foreignKeyExtractorkeyExtractor,
                                                            final ValueJoiner<VRValueJoiner<VL, VLVR, V0> joiner,
                                                            final Materialized<KL, V0, KeyValueStore<Bytes, byte[]>> materialized,
                                                            final Serde<KL> thisKeySerde,
                                                    Serde<KR> otherKeySerde,
        final Serde<VL> thisValueSerde,
                                                       Serde<VR> otherValueSerde,
     final Serde<KR> otherKeySerde,
                                                            final Serde<V0> joinedValueSerde); {


Workflows

For the purposes of this KIP, keep these concepts in mind as you read through the document.

...