Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When performing the prefix scan, we simply drop the primary key component, such that the serialized combined key in the example above, looks like:

Code Block
{4-bytes foreign key serialized length}{n-bytes serialized foreign key}

...

A custom partitioner is used to ensure that the CombinedKey left table data is correctly copartitioned with the right table data. This is a simple operation, as it simply extracts the foreign key and applies the partitioner logic. It is important that the same partitioner that is used to partition the right table is used to partition the rekeyed left-table data.


CombinedKeys Usage by Left Processor

CombinedKeys are used simply to ensure that the left data is correctly copartitioned, and such that the primary key is preserved for future downstream rekeying.

Gliffy Diagram
nameLeftProcessingWithCombinedKey
pagePin23

CombinedKeys Usage by

...

PrefixScan Processor

The left's CombinedKey is extremely important for allowing the prefixScan to occur. The serialized byte payload is used to determine prefix equivalence during the prefixScan.

Gliffy Diagram
nameRangeScanCombinedKeyUsage

...

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin1

PrefixScan on CombinedKey

...



Problem: Out-of-order processing of

...

Rekeyed data

There is an issue that arises when updating a foreign key value in an event in the left table. We first must send a delete on the previous CombinedKey, and send the new value on the new CombinedKey. This results in a race condition, as illustrated below.

Gliffy Diagram
nameRaceCondition-Simple
pagePin1

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions. The only remedy to this that I can currently find is to propagate the offset of the original update along with the payload of the message. This introduces my next section.


The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play in the resolution stage.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Usage