Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Requires three full materialized State Stores
    1. One for the prefixScanning of the repartitioned CombinedKey events.
    2. One for the highwater mark for resolving out-of-order processing of events.
    3. One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source (see KTableSource).
  2. Merging the highwater and final materialized may be possible, but it is unlikely to be useful if we wish for users of this API to be able to specify their own Materialized state store.
  3. Caching is disabled on the prefixScan store, same reasons as Jan gave above.
  4. ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this. A number of which are currently , but follows an existing precedent where some store functions are already stubbed out with exceptions. I would like to take any advice on this component with regards to its suitability in this interface.
  5. Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).
  6. Uses RecordHeaders to address out-of-order processing. The added offset and propagate headers used by the foreignKeyJoin do not persist outside of the function, but they may collide with user-specified header keys.
  7. CombinedKeyLeftKeyPartitioner CombinedKeyByForeignKeyPartitioner -> uses a copied + pasted implementation of the DefaultPartitioner. Evaluate breaking this out of the DefaultPartitioner Producer into an accessible function.
  8. The large number of variables passed to the KTableKTableForeignKeyJoinNode. Current decision is to leave it as its own node because it doesn't quite fit the patterns of existing nodes. In addition, I am not sure if it each DSL operation should have its own Node type or not.
  9. The KTableKTableForeignKeyJoinNode signature (25 parameters! too many, should be < 13)
  10. Application Reset does not seem to delete the new internal topics that I have added. (only tested with Kafka 1.0).

Motivation

Same as above.

Public Interfaces

Code Block
public <V0, KL, VL, KR, VR> KTable<KL, V0> joinOnForeignKey(final KTable<KR, VR> other,
                                                            final ValueMapper<VL, KR> keyExtractor,
                                                            final ValueJoiner<VL, VR, V0> joiner,
                                /**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param keyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param thisKeySerde the serde for K
 * @param thisValueSerde the serde for V
 * @param otherKeySerde the serde for KO
 * @param joinedValueSerde the serde for the output value, VR
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> joinOnForeignKey(final KTable<KO, VO> other,
                            final Materialized<KL, V0, KeyValueStore<Bytes, byte[]>> materialized,
           final ValueMapper<V, KO> keyExtractor,
                                            final ValueJoiner<V, final StreamPartitioner<KR,?> foreignKeyPartitionerVO, VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
            final Serde<KL> thisKeySerde,
                              final Serde<K> thisKeySerde,
                            final Serde<VL> thisValueSerde,
              final Serde<V> thisValueSerde,
                                            final Serde<KR>Serde<KO> otherKeySerde,
                                            final Serde<VR> joinedValueSerde);
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param keyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param foreignKeyPartitioner the partitioner finalthat Serde<V0>the joinedValueSerde)other {@code KTable} uses to partition its data.

public <V0, KL, VL, KR, VR> KTable<KL, V0> joinOnForeignKey(final KTable<KR, VR> other,
                                                 * @param thisKeySerde the serde for K
 * @param thisValueSerde the serde for V
 * @param otherKeySerde the serde for KO
 * @param joinedValueSerde the serde for the output value, VR
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> joinOnForeignKey(final KTable<KO, VO> other,
            final ValueMapper<VL, KR> keyExtractor,
                             final ValueMapper<V, KO> keyExtractor,
                            final ValueJoiner<VL, VR, V0> joiner,
            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<KLMaterialized<K, V0VR, KeyValueStore<Bytes, byte[]>> materialized,
                                            final StreamPartitioner<KO, ?> foreignKeyPartitioner,
             final Serde<KL> thisKeySerde,
                             final Serde<K> thisKeySerde,
                             final Serde<VL> thisValueSerde,
              final Serde<V> thisValueSerde,
                                            final Serde<KR>Serde<KO> otherKeySerde,
                                                            final Serde<V0>Serde<VR> joinedValueSerde) {;


Workflows

For the purposes of this KIP, keep these concepts in mind as you read through the document.

...