Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions. The only remedy to this that I can currently find is to propagate the offset of the original update along with the payload of the message. This introduces my next section.


Solution

...

- Hold Ordering & Propagation Metadata in Record Headers

Gliffy Diagram
nameOutOfOrderResolution-RecordHeaders
pagePin4



Final Steps - Materializing

Since the final

...

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Serialization Format

Code Block
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value}

Usage - Example of PropagationWrapper

...

Multiple Rapid Foreign-Key Value Changes - Example of PropagationWrapper

In this case, there are multiple, rapid changes made to the foreign-key of a single record. We will need to resolve which one is the correct final value.

Gliffy Diagram
nameHighwaterMarkUsage
pagePin3

Final Steps - Materializing

Since the final out-of-order data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required. In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:

...

Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

Rejected Alternatives

...

data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required. In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:

Gliffy Diagram
nameFinalStageToUserStateStore
pagePin1


Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

Rejected Alternatives

PropagationWrapper - Removed it to look at using Record Headers instead to contain offset and propagation metadata.

The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Serialization Format

Code Block
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value}



Usage - Example of PropagationWrapper


Gliffy Diagram
namePropagationWrapperIntro
pagePin1

Multiple Rapid Foreign-Key Value Changes - Example of PropagationWrapper

In this case, there are multiple, rapid changes made to the foreign-key of a single record. We will need to resolve which one is the correct final value.

Gliffy Diagram
nameHighwaterMarkUsage
pagePin3