Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other - the table containing the records to be joined on. Keyed by KO.
 * @param foreignKeyExtractor - extracts the key (KO) from this table's value (V).
 * @param joiner - specifies how to join the records from both tables
 * @param materialized - the materialized output store (optional)
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                final ValueMapper<V, KO> foreignKeyExtractor,
                                final ValueJoiner<V, VO, VR> joiner,
                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)

...

Gliffy Diagram
nameJohnWorkflow
pagePin15




More Detailed Implementation Details

...

Prefix-Scanning using a CombinedKey

CombinedKey is a simple tuple wrapper that stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes. I do not know of the json complexities that Jan speaks of above, but as long as the extracted foreign key from the left table is identical to the primary key in the right table, the serialization should be identical.
The CombinedKey is serialized as follows:

...

This can support up to MAXINT size of data in bytes per key, which currently far exceeds the realistic sizing of messages in Kafka. If we so wish, we could expand this to be much larger, but I don't see a need to do so at this time. A more clever or compact serialization solution may be available, this is just the simplest one I came up with.

When performing the prefix scan When performing the prefix scan on the RocksDB instance, we simply drop the primary key component, such that the serialized combined key looks like:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}

Custom Partitioner Usage

A custom partitioner is used to ensure that the CombinedKey this-table data is correctly copartitioned with the other-table data. This is a simple operation, as it simply extracts the foreign key and applies the partitioner logic. It is important that the same partitioner that is used to partition the right table is used to partition the rekeyed this-table data.

This/Event Processor Behaviour

  • CombinedKey data goes to a repartition topic coparitioned with the Other/Entity data.
  • The primary key in CombinedKey is preserved for downstream usage.

Gliffy Diagram
nameLeftProcessingWithCombinedKey
pagePin5

Other/Entity PrefixScan Processor Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the This/Event data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin6

Problem: Out-of-order processing of Rekeyed data

There is an issue that arises when updating a foreign key value in an event in the left table. We first must send a delete on the previous CombinedKey, and send the new value on the new CombinedKey. This results in a race condition, as illustrated below.

Gliffy Diagram
nameOutOfOrder Problem
pagePin2

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions.

Resolving out-of-order events:

As proposed by John Roesler, the resolver can take advantage of the local data stores on the node:

Other/Entity PrefixScan Processor Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the This/Event data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin8


Joiner Propagation of Stale Data

Rapid changes to a foreign Key (perhaps due capturing changes from a relational database undergoing such operations) can cause a race condition. The other/Entity foreign keyed table may be quite congested with subscription requests on one node, and lightly subscribed on another due to the nature of the key distribution. As such, there is no guarantee the order in which the subscription updates will arrive in the post-subscribe repartition topic. It is for this reason why we must compare the current state of the primary key event to ensure that the data is still consistent with the extracted foreign key.

Gliffy DiagramnameresolverpagePin1While it is possible for a stale event to be propagated (ie: matches the foreign key, but is stale), the up-to-date event will be propagated when it eventually arrives. This is eventually consistent. Entity changes do not cause the same race conditions that event-changes do, and so are not of a concern. They will, however, fall under this same resolution scheme.


Compatibility, Deprecation, and Migration Plan

...

  • Transfers the maximum amount of data across a network
  • Joining on the foreign node can be a bottle-neck vs. joining on the local node.

Problem: Out-of-order processing of Rekeyed data

Solution A - Hold Ordering Metadata in Record Headers and Highwater Mark Table

...