Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
<V0, KL, VL, KR, VR> KTable<KL, V0> foreignKeyJoin(KTable<KR, VR> other,
                                                    ValueMapper<VL, KR> foreignKeyExtractor,
                                                    final ValueJoiner<VR, VL, V0> joiner,
                                                    final Materialized<KL, V0, KeyValueStore<Bytes, byte[]>> materialized,
                                                    Serde<KL> thisKeySerde,
                                                    Serde<KR> otherKeySerde,
                                                    Serde<VR> otherValueSerde,
                                                    Serde<V0> joinedValueSerde);


Workflows

For the purposes of this KIP, keep these concepts in mind as you read through the document.

Left KTable - This contains the many. Think of this table as a large number of events with their own distinct keys, each of which contains a foreign key that we wish to join on. This is the table of events which we will be joining from. (ie: KTableLeft.foreignKeyJoin ... )
Example: Users clicking on a product, where an event contains a key on UserId+Timestamp and a value containing the foreign key, productId.

Code Block
Left Sample: 
(key,value)  =  (UserA-TimestampA, Event(type=Click, productid=900560) )


Right KTable - This contains theone. This is the table that contains the data keyed on the foreign key.

Example: The Product Data:

Code Block
Right Sample:
(key, value)  =  (900560, ProductDetails(size = small, colour = blue, fabric = wool)) 


Overview

The overall process is fairly simple. This process is outlined below.

Gliffy Diagram
nameBellemareOverview213
pagePin

...

3


Repartitioning using a CombinedKey

...

Wrapper

CombinedKey is a simple tuple wrapper that simply stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes, and that is left entirely up to the user to specify. It is assumed that both the complete foreign key and the extracted foreign key will serialize in exactly the same way. I do not know of the json complexities that Jan speaks of above, but as long as the extracted foreign key from the left table is identical to the primary key in the right table, the serialization should be identical.

...

Gliffy Diagram
nameLeftProcessingWithCombinedKey
pagePin34

CombinedKeys Usage by PrefixScan Processor

...

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin13



Problem: Out-of-order processing of Rekeyed data

There is an issue that arises when updating a foreign key value in an event in the left table. We first must send a delete on the previous CombinedKey, and send the new value on the new CombinedKey. This results in a race condition, as illustrated below.

...

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions. The only remedy to this that I can currently find is to propagate the offset of the original update along with the payload of the message. This introduces my next section.


Solution: The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play in the during resolution stage.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

...

Gliffy Diagram
nameHighwaterMarkUsage
pagePin3


Why do I currently have both an offset and a propagate boolean?



Current Issues to Work out

...