Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions. The only remedy to this that I can currently find is to propagate the offset of the original update along with the payload of the message. This introduces my next section.

Final Steps - Materializing

A Table KTable<CombinedKey<A,B>,JoinedResult> is not a good return type. It breaks the KTable invariant that a table is currently partitioned by its key, which this table wouldn't be and the CombinedKey is not particularly usefull as its a mere Kafka artifact.

User Managed Group by

with a followed up group by, we can remove the repartitioning artifact by grouping into a map. Out of order events can be hold in the map and can be dealt with, hower one likes it. Either wait for some final state and propagate no changes that are "intermediate" and show artifacts or propagate directly. The eventuall correcness is guaranteed in both ways. The huge advantage is further, that the group by can be by any key, resulting in a table of that key.

...

Solution - Hold Ordering Metadata in Record Headers

Automaically rekey for the user. The user can get a clean KTable of the original key back. repartitioning required.

...

Solution A - Hold Ordering Metadata in Record Headers

Automaically rekey for the user. The user can get a clean KTable of the original key back. repartitioning required.

Gliffy Diagram
nameOutOfOrderResolution-RecordHeaders
pagePin7
version5


Since the final out-of-order data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required when providing statefulness to data stored in a topic (see KTableSource). In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:

Gliffy Diagram
nameFinalStageToUserStateStore
pagePin1

The Problem of Multiple Rapid Foreign-Key Value Changes - Why a highwater mark table is required

In this case, there are multiple, rapid changes made to the foreign-key of a single record. The joiner will need to resolve which one is the correct final value. Records will arrive out of order due to a variety of reasons, such as a node crashing, congestion in the input partition or network latency.

1) In the example below, you can see that there are three updates occurring to the source topic. Each update alters the foreignKey within the value payload of the Left KTable.

2) When this value is altered, a new CombinedKey needs to be propagated to the corresponding partitions. One to delete, and one to update.

3) The events are written into the rekeyed internal partitions of the foreign key joiner. These events are co-partitioned with the foreign key data, such that they can be joined locally on the node.

4) The data is joined in its own thread, at its own rate. Since each thread processes data asynchronously to all the other threads, there is no longer any guarantee about which events will be processed first.

5) Eventually the stream threads will process the data and output their result, which will be written back to the original partitioning in Step 1. Note that the messages may arrive completely out of order, since the processing guarantees of Step 4 cannot be guaranteed in any way (aside from "it should eventually complete").

6) At this point, this process needs to determine which of the out-of-order updates should be propagated out. The logic here is simple: if the original update from Step 1 is higher than that which is currently recorded in the highwater table for that specific key, update it the highwater mark and propagate the event. Otherwise, discard the event.


Gliffy Diagram
nameHighwaterMarkUsage
pagePin4


Alternate Solution B (Jan's) - Materializing

A Table KTable<CombinedKey<A,B>,JoinedResult> is not a good return type. It breaks the KTable invariant that a table is currently partitioned by its key, which this table wouldn't be and the CombinedKey is not particularly usefull as its a mere Kafka artifact.

User Managed Group by

with a followed up group by, we can remove the repartitioning artifact by grouping into a map. Out of order events can be hold in the map and can be dealt with, hower one likes it. Either wait for some final state and propagate no changes that are "intermediate" and show artifacts or propagate directly. The eventuall correcness is guaranteed in both ways. The huge advantage is further, that the group by can be by any key, resulting in a table of that key.

Gliffy Diagram
size600
nameOutOfOrderResolution-RecordHeaders
pagePin7


Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

Rejected Alternatives

PropagationWrapper - Removed it to look at using Record Headers instead to contain offset and propagation metadata.

The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Serialization Format

Code Block
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value}

Usage - Example of PropagationWrapper


Gliffy Diagram
namePropagationWrapperIntro
pagePin1

Since the final out-of-order data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required when providing statefulness to data stored in a topic (see KTableSource). In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:

...

Unified Finalizing

Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

Rejected Alternatives

PropagationWrapper - Removed it to look at using Record Headers instead to contain offset and propagation metadata.

The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket.

Serialization Format

Code Block
{8-byte long original offset}{byte boolean propagation , stored in bit 0}{4-byte value length}{serialized value}

Usage - Example of PropagationWrapper

...

Multiple Rapid Foreign-Key Value Changes - Why a highwater table is required.

In this case, there are multiple, rapid changes made to the foreign-key of a single record. The joiner will need to resolve which one is the correct final value. Records will arrive out of order due to a variety of reasons, such as a node crashing, congestion in the input partition or network latency.

1) In the example below, you can see that there are three updates occurring to the source topic. Each update alters the foreignKey within the value payload of the Left KTable.

2) When this value is altered, a new CombinedKey needs to be propagated to the corresponding partitions. One to delete, and one to update.

3) The events are written into the rekeyed internal partitions of the foreign key joiner. These events are co-partitioned with the foreign key data, such that they can be joined locally on the node.

4) The data is joined in its own thread, at its own rate. Since each thread processes data asynchronously to all the other threads, there is no longer any guarantee about which events will be processed first.

5) Eventually the stream threads will process the data and output their result, which will be written back to the original partitioning in Step 1. Note that the messages may arrive completely out of order, since the processing guarantees of Step 4 cannot be guaranteed in any way (aside from "it should eventually complete").

6) At this point, this process needs to determine which of the out-of-order updates should be propagated out. The logic here is simple: if the original update from Step 1 is higher than that which is currently recorded in the highwater table for that specific key, update it the highwater mark and propagate the event. Otherwise, discard the event.

Gliffy Diagram
nameHighwaterMarkUsage
pagePin4


Rejected Alternatives:

GroupBy + Reduce / Aggregate

...