Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current Limitations / Points / Issues of note:

Some points of note:

  1. Requires two

    Requires three

    full materialized State Stores
    One for the prefixScanning of the repartitioned CombinedKey

    events.One for the highwater mark for resolving out-of-order processing of

    events.
    One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source (see KTableSource)

    .Merging the highwater and final materialized may be possible, but it is unlikely to be useful if we wish for users of this API to be able to specify their own Materialized state store

    .

  2. Caching is disabled on the prefixScan store, same reasons as Jan gave above.

  3. ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this, but follows an existing precedent where some store functions are already stubbed out with exceptions.

  4. Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).

  5. Uses RecordHeaders to address out-of-order processing. The added offset and propagate headers used by the foreignKeyJoin do not persist outside of the function, but they may collide with user-specified header keys.
  6. CombinedKeyByForeignKeyPartitioner -> uses a copied + pasted implementation of the DefaultPartitioner. Evaluate breaking this out of the DefaultPartitioner Producer into an accessible function.
  7. The large number of variables passed to the KTableKTableForeignKeyJoinNode. Current decision is to leave it as its own node because it doesn't quite fit the patterns of existing nodes. In addition, I am not sure if it each DSL operation should have its own Node type or not.
  8. The KTableKTableForeignKeyJoinNode signature (25 parameters! too many, should be < 13)
  9. Application Reset does not seem to delete the new internal topics that I have added. (only tested with Kafka 1.0).

Motivation

Same as above.

Public Interfaces

  1. Application Reset does not seem to delete the new internal topics that I have added. (only tested with Kafka 1.0).

Motivation

Same as above.

Public Interfaces

Code Block
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param foreignKeyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
Code Block
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param keyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param thisSerialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for this KTable
 * @param otherSerialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for the other KTable
 * @param joinedSerialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for the resultant joined KTable
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> joinOnForeignKey(final KTable<KO, VO> other,
                                            final ValueMapper<V, KO> keyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                            final Serialized<K, V> thisSerialized,
                                            final Serialized<KO, VO> otherSerialized,
                                            final Serialized<K, VR> joinedSerialized);

/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO
 * @param keyExtractor extracts the key (KO) from this table's value (V)
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param foreignKeyPartitioner the partitioner that the other {@code KTable} uses to partition its data
 * @param thisSerialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for this KTable
 * @param otherSerialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for the other KTable
 * @param joinedSerialized the {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for the resultant joined KTable
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> joinOnForeignKey(final KTable<KO, VO> other,
                                            final ValueMapper<V, KO> keyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                            final StreamPartitioner<KO, ?> foreignKeyPartitioner,
                                            final Serialized<K, V> thisSerialized,
         final ValueMapper<V, KO> foreignKeyExtractor,
                                final ValueJoiner<V, Serialized<KOVO, VO>VR> otherSerializedjoiner,
                                final Materialized<K,           final Serialized<K, VR> joinedSerialized);VR, KeyValueStore<Bytes, byte[]>> materialized)


Workflows

For the purposes of this KIP, keep these concepts in mind as you read through the document.

...

  • Call enable sendOldValues() on sources with "*"
  • Register a child of B
    • extract A's key and B's key as key and B as value.
      • forward(key, null) for old
      • forward(key, b) for new
      • skip old if A's key didn't change (ends up in same partition)
  • Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
    • in the sink, only use A's key to determine partition
  • Register source for intermediate topic
    • co-partition with A's sources
    • materialize
    • serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
  • Register processor after above source.
    • On event extract A's key from the key
    • look up A by it's key
    • perform the join (as usual)
  • Register processor after A's processor
    • On event uses A's key to perform a Range scan on B's materialization
    • For every row retrieved perform join as usual
  • Register merger
    • Forward join Results
    • On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
  • Merger wrapped into KTable and returned to the user.

Step by Step

TOPOLOGY INPUT ATOPOLOGY INPUT BSTATE A MATERIALZEDSTATE B MATERIALIZEINTERMEDIATE RECORDS PRODUCEDSTATE B OTHER TASKOutput A Source / Input Range ProccesorOUTPUT RANGE PROCESSOROUTPUT LOOKUP PROCESSOR
key: A0 value: [A0 ...] key: A0 value: [A0 ...]   Change<null,[A0 ...]>

invoked but nothing found.

Nothing forwarded

 
key: A1 value: [A1 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

   Change<null,[A1 ...]>invoked but nothing found. Nothing forwarded 
 key: B0 : value [A2,B0 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]partition key: A2 key: A2B0 value: [A2,B0 ...]key: A2B0 : value [A2,B0 ...]  

invoked but nothing found

Nothing forwarded

 key: B1 : value [A2,B1 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

partition key: A2 key: A2B1 value [A2,B1 ...]

key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

  

invoked but nothing found

Nothing forwarded

key: A2 value: [A2 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: A2 value: [A2 ...]

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

 

key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

Change<null,[A2 ...]>

key A2B0 value: Change<null,join([A2 ...],[A2,B0 ...])

key A2B1 value: Change<null,join([A2 ...],[A2,B1...])

 
 key: B1 : value null key: B0 : value [A2,B0 ...]partition key: A2 key: A2B1 value:nullkey: A2B0 : value [A2,B0 ...]  key A2B1 value: Change<join([A2 ...],[A2,B1...],null)
 key: B3 : value [A0,B3 ...] 

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

partition key: A0 key: A0B3 value:[A0,B3 ...]

key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

  key A0B3 value: Change<join(null,[A0 ...],[A0,B3...])
key: A2 value: null 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

 

key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

Change<[A2 ...],null>key A2B0 value: Change<join([A2 ...],[A2,B0 ...],null) 

 

Range lookup

It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.

...