Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param keyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param thisKeySerdethisSerialized the serde {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for Kthis KTable
 * @param thisValueSerdeotherSerialized the serde{@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for Vthe other KTable
 * @param otherKeySerdejoinedSerialized the serde{@link forSerialized} KO
instance *used @paramto joinedValueSerdespecify the serde{@link org.apache.kafka.common.serialization.Serdes} for the outputresultant value,joined VRKTable
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> joinOnForeignKey(final KTable<KO, VO> other,
                                            final ValueMapper<V, KO> keyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                            final Serde<K> thisKeySerdeSerialized<K, V> thisSerialized,
                                            final Serde<V> thisValueSerde,
                                            final Serde<KO> otherKeySerdeSerialized<KO, VO> otherSerialized,
                                            final Serialized<K, Serde<VR>VR> joinedValueSerdejoinedSerialized);

/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param keyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param foreignKeyPartitioner the partitioner that the other {@code KTable} uses to partition its data.
 * @param thisKeySerdethisSerialized the serde for K {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for this KTable
 * @param thisValueSerdeotherSerialized the serde for V {@link Serialized} instance used to specify {@link org.apache.kafka.common.serialization.Serdes} for the other KTable
 * @param otherKeySerdejoinedSerialized the serde{@link forSerialized} KO
instance *used @paramto joinedValueSerdespecify the serde{@link org.apache.kafka.common.serialization.Serdes} for the outputresultant value,joined VRKTable
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> joinOnForeignKey(final KTable<KO, VO> other,
                                            final ValueMapper<V, KO> keyExtractor,
                                            final ValueJoiner<V, VO, VR> joiner,
                                            final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                            final StreamPartitioner<KO, ?> foreignKeyPartitioner materialized,
                                            final StreamPartitioner<KO, Serde<K>?> thisKeySerdeforeignKeyPartitioner,
                                            final Serde<V> thisValueSerdeSerialized<K, V> thisSerialized,
                                            final Serde<KO> otherKeySerdeSerialized<KO, VO> otherSerialized,
                                            final Serialized<K, Serde<VR>VR> joinedValueSerdejoinedSerialized);


Workflows

For the purposes of this KIP, keep these concepts in mind as you read through the document.

Left/Event KTable - This contains the many. Think of this table as a large number of events with their own distinct keys, each of which contains a foreign key that we wish to join on. This is the table of events which we will be joining from. (ie: KTableLeft.foreignKeyJoin ... )
Example: Users clicking on a product, where an event contains a key on UserId+Timestamp and a value containing the foreign key, productId.

Code Block
Left/Event Sample: 
(key,value)  =  (UserA-TimestampA, Event(type=Click, productid=900560) )


Right/Entity KTable - This contains theone. This is the table that contains the data keyed on the foreign key.

Example: The Product Data:

Code Block
Right/Entity Sample:
(key, value)  =  (900560, ProductDetails(size = small, colour = blue, fabric = wool)) 

...

CombinedKey is a simple tuple wrapper that simply stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes, and that is left entirely up to the user to specify. It is assumed that both the complete foreign key and the extracted foreign key will serialize in exactly the same way. I do not know of the json complexities that Jan speaks of above, but as long as the extracted foreign key from the left table is identical to the primary key in the right table, the serialization should be identical.

The CombinedKey is serialized as follows:as follows:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}{primaryKeySerialized
Code Block
{4-bytes foreign key serialized length}{n-bytes serialized foreign key}{4-bytes primary key serialized length}{n-bytes serialized primary key}

This can support up to MAXINT size of data in bytes per key, which currently far exceeds the realistic sizing of messages in Kafka. If we so wish, we could expand this to be much larger, but I don't see a need to do so at this time. A more clever or compact serialization solution may be available, this is just the simplest one I came up with.

When performing the prefix scan on the RocksDB instance, we simply drop the primary key component, such that the serialized combined key looks like:

Code Block
{4-bytes foreign key serialized length}{n-bytes serialized foreign key}

...

byte foreignKeyLength}{foreignKeySerialized}

Custom Partitioner Usage

A custom partitioner is used to ensure that the CombinedKey left table data is correctly copartitioned with the right table data. This is a simple operation, as it simply extracts the foreign key and applies the partitioner logic. It is important that the same partitioner that is used to partition the right table is used to partition the rekeyed left-table data.

...

Left/Event Processor

...

Behaviour

  • CombinedKey data goes to a repartition topic coparitioned with the Right/Entity data.
  • The primary key in CombinedKey is preserved for downstream usage

...

  • .

Gliffy Diagram
nameLeftProcessingWithCombinedKey
pagePin4

...

Right/Entity PrefixScan Processor

...

Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the Left/Entity data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin35



Problem: Out-of-order processing of Rekeyed data

...