Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There is an issue that arises when updating a foreign key value in an event in the left table. We first must send a delete on the previous CombinedKey, and send the new value on the new CombinedKey. This results in a race condition, as illustrated below.


Gliffy Diagram
name
RaceCondition-Simple
OutOfOrder Problem
pagePin1

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions. The only remedy to this that I can currently find is to propagate the offset of the original update along with the payload of the message. This introduces my next section.

Solution A - Hold Ordering Metadata in Record Headers


Resolving out-of-order events:

As proposed by John Roesler, the resolver can take advantage of the local data stores on the node:Automaically rekey for the user. The user can get a clean KTable of the original key back. repartitioning required.

Gliffy Diagram
name
OutOfOrderResolution-RecordHeaders
resolver
pagePin
7version5

Since the final out-of-order data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required when providing statefulness to data stored in a topic (see KTableSource). In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:

...

1


While it is possible for a stale event to be propagated (ie: matches the foreign key, but is stale), the up-to-date event will be propagated when it arrives. This is eventually consistent. Entity changes do not cause the same race conditions that event-changes do, and so are not of a concern. They will, however, fall under this same resolution scheme.



Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.



Rejected Alternatives

PropagationWrapper - Removed it to look at using Record Headers instead to contain offset and propagation metadata.

The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket

Usage - Example of PropagationWrapper

Gliffy Diagram
namePropagationWrapperIntro
pagePin1

Rejected Alternatives:


Problem: Out-of-order processing of Rekeyed data
Solution A - Hold Ordering Metadata in Record Headers and Highwater Mark Table

Automaically rekey for the user. The user can get a clean KTable of the original key back. repartitioning required.

Gliffy Diagram
nameOutOfOrderResolution-RecordHeaders
pagePin7
version5


Since the final out-of-order data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required when providing statefulness to data stored in a topic (see KTableSource). In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:

Gliffy Diagram
nameFinalStageToUserStateStore
pagePin1

...

The Problem of Multiple Rapid Foreign-Key Value Changes - Why a highwater mark table is required

In this case, there are multiple, rapid changes made to the foreign-key of a single record. The joiner will need to resolve which one is the correct final value. Records will arrive out of order due to a variety of reasons, such as a node crashing, congestion in the input partition or network latency.

1) In the example below, you can see that there are three updates occurring to the source topic. Each update alters the foreignKey within the value payload of the Left KTable.

2) When this value is altered, a new CombinedKey needs to be propagated to the corresponding partitions. One to delete, and one to update.

3) The events are written into the rekeyed internal partitions of the foreign key joiner. These events are co-partitioned with the foreign key data, such that they can be joined locally on the node.

4) The data is joined in its own thread, at its own rate. Since each thread processes data asynchronously to all the other threads, there is no longer any guarantee about which events will be processed first.

5) Eventually the stream threads will process the data and output their result, which will be written back to the original partitioning in Step 1. Note that the messages may arrive completely out of order, since the processing guarantees of Step 4 cannot be guaranteed in any way (aside from "it should eventually complete").

6) At this point, this process needs to determine which of the out-of-order updates should be propagated out. The logic here is simple: if the original update from Step 1 is higher than that which is currently recorded in the highwater table for that specific key, update it the highwater mark and propagate the event. Otherwise, discard the event.

Gliffy Diagram
nameHighwaterMarkUsage
pagePin4

Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

...

PropagationWrapper - Removed it to look at using Record Headers instead to contain offset and propagation metadata.

The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Serialization Format

Code Block
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value}

Usage - Example of PropagationWrapper

...



GroupBy + Reduce / Aggregate

...