Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion
Discussion thread
: here
JIRA
: KAFKA-3705  Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Note: This KIP contains two major sections - there was a previous attempt at resolving this by Jan. The current proposal is at the top.

Motivation

We want to close Close the gap between the semantics of KTables in streams and tables in relational databases. It is common practice to capture changes as they are made to tables in some RDMS a RDBMS into kafka topics (JDBC-connect, debeziumDebezium, Maxwell). It is also common practice to use some sort of normalization inside the RDBMS so that entities end up in a These entities typically have multiple one-to-many relationship. Usually RDBMSs offer good support to resolve this relationship with a join. Streams falls short here and the workaround (group by - join - lateral view) is not well supported as well and is not in line with the idea of record based processing.

 

Alternative Design - Adam Bellemare 2018

Features

...

  1. Performs Perform updates from both sides of the join.
  2. Uses user-specified serdes to perform the prefixScans.
  3. Resolves out-of-order processing due to foreignKey changes.
  4. Fully scalable with increasing data, provided that one thread doesn't process most of the foreign-key data.

Design Philosophy

  1. The foreignKeyJoin implementation doesn't require the user to have any knowledge about the inner workings of the join.
  2. All events are returned on the same key as they were provided (K,V in, K,V0 out). This is the same as the regular join function.
  3. Some Serdes are required to be provided by the user, as they are used to handle the CombinedKey serialization / deserialization.
  4. The user can provide a custom partitioner if they so wish. This allows them to not have to rely strictly on the DefaultPartitioner if they have custom partitioning logic.

Current Limitations / Points / Issues of note:

Some points of note:

  1. Requires two full materialized State Stores
    One for the prefixScanning of the repartitioned CombinedKey events.
    One for the final materialized sink. This is required because I am sourcing the events back from a repartitioning topic, and so a state store is required to construct a proper Topic Source (see KTableSource).

  2. Caching is disabled on the prefixScan store, same reasons as Jan gave above.

  3. ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this, but follows an existing precedent where some store functions are already stubbed out with exceptions.

  4. Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).

  5. Application Reset does not seem to delete the new internal topics that I have added. (only tested with Kafka 1.0).

Motivation

Same as above.

Public Interfaces

Code Block
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param foreignKeyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                final ValueMapper<V, KO> foreignKeyExtractor,
                                final ValueJoiner<V, VO, VR> joiner,
                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)

Workflows

For the purposes of this KIP, keep these concepts in mind as you read through the document.

This/Event KTable - This contains the many. Think of this table as a large number of events with their own distinct keys, each of which contains a foreign key that we wish to join on. This is the table of events which we will be joining from. (ie: KTableLeft.foreignKeyJoin ... )
Example: Users clicking on a product, where an event contains a key on UserId+Timestamp and a value containing the foreign key, productId.

Code Block
This/Event Sample: 
(key,value)  =  (UserA-TimestampA, Event(type=Click, productid=900560) )

...

  1. Scalable as per normal joins

Public Interfaces

Code Block
/**
 * Joins the records of this KTable to another table keyed on a different key. Updates from this table will join
 * 1 to 1 on the other table. Updates to the other table will induce a join on each record in this table that has
 * that specific foreign key.
 *
 * @param other the table containing the records to be joined on. Keyed by KO.
 * @param foreignKeyExtractor extracts the key (KO) from this table's value (V).
 * @param joiner specifies how to join the records from both tables
 * @param materialized the materialized output store
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
 * @param <VO> the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                final ValueMapper<V, KO> foreignKeyExtractor,
                                final ValueJoiner<V, VO, VR> joiner,
                                final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized)

Workflow

The overall process is fairly simple and is outlined below.

Gliffy Diagram
namesimplifiedOverview
pagePin3





More Detailed Implementation Details

Repartitioning using CombinedKey

CombinedKey is a simple tuple wrapper that stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes. I do not know of the json complexities that Jan speaks of above, but as long as the extracted foreign key from the left table is identical to the primary key in the right table, the serialization should be identical.

The CombinedKey is serialized as follows:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}{primaryKeySerialized}

This can support up to MAXINT size of data in bytes per key, which currently far exceeds the realistic sizing of messages in Kafka. If we so wish, we could expand this to be much larger, but I don't see a need to do so at this time. A more clever or compact serialization solution may be available, this is just the simplest one I came up with.

When performing the prefix scan on the RocksDB instance, we simply drop the primary key component, such that the serialized combined key looks like:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}

Custom Partitioner Usage

A custom partitioner is used to ensure that the CombinedKey left table data is correctly copartitioned with the right table data. This is a simple operation, as it simply extracts the foreign key and applies the partitioner logic. It is important that the same partitioner that is used to partition the right table is used to partition the rekeyed left-table data.

This/Event Processor Behaviour

  • CombinedKey data goes to a repartition topic coparitioned with the Right/Entity data.
  • The primary key in CombinedKey is preserved for downstream usage.

Gliffy Diagram
nameLeftProcessingWithCombinedKey
pagePin4

Right/Entity PrefixScan Processor Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the Left/Entity data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin5

Problem: Out-of-order processing of Rekeyed data

There is an issue that arises when updating a foreign key value in an event in the left table. We first must send a delete on the previous CombinedKey, and send the new value on the new CombinedKey. This results in a race condition, as illustrated below.


Gliffy Diagram
nameOutOfOrder Problem
pagePin1

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions.

Resolving out-of-order events:

As proposed by John Roesler, the resolver can take advantage of the local data stores on the node:

Gliffy Diagram
nameresolver
pagePin1


While it is possible for a stale event to be propagated (ie: matches the foreign key, but is stale), the up-to-date event will be propagated when it arrives. This is eventually consistent. Entity changes do not cause the same race conditions that event-changes do, and so are not of a concern. They will, however, fall under this same resolution scheme.


Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

...

Example: The Product Data:

Code Block
Other/Entity Sample:
(key, value)  =  (900560, ProductDetails(size = small, colour = blue, fabric = wool)) 

Overview

The overall process is fairly simple. This process is outlined below.

Gliffy Diagram
nameBellemareOverview213
pagePin3

Repartitioning using a CombinedKey Wrapper

CombinedKey is a simple tuple wrapper that stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes. I do not know of the json complexities that Jan speaks of above, but as long as the extracted foreign key from the left table is identical to the primary key in the right table, the serialization should be identical.

The CombinedKey is serialized as follows:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}{primaryKeySerialized}

This can support up to MAXINT size of data in bytes per key, which currently far exceeds the realistic sizing of messages in Kafka. If we so wish, we could expand this to be much larger, but I don't see a need to do so at this time. A more clever or compact serialization solution may be available, this is just the simplest one I came up with.

When performing the prefix scan on the RocksDB instance, we simply drop the primary key component, such that the serialized combined key looks like:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}

Custom Partitioner Usage

A custom partitioner is used to ensure that the CombinedKey left table data is correctly copartitioned with the right table data. This is a simple operation, as it simply extracts the foreign key and applies the partitioner logic. It is important that the same partitioner that is used to partition the right table is used to partition the rekeyed left-table data.

Left/Event Processor Behaviour

  • CombinedKey data goes to a repartition topic coparitioned with the Right/Entity data.
  • The primary key in CombinedKey is preserved for downstream usage.

Gliffy Diagram
nameLeftProcessingWithCombinedKey
pagePin4

Right/Entity PrefixScan Processor Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the Left/Entity data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin5

Problem: Out-of-order processing of Rekeyed data

There is an issue that arises when updating a foreign key value in an event in the left table. We first must send a delete on the previous CombinedKey, and send the new value on the new CombinedKey. This results in a race condition, as illustrated below.

...

This race condition is especially visible when multiple threads are being used to process the Kafka topic partitions. A given thread on a given node may process its records much sooner or much later than the other threads, due to load, network, polling cycles, and a variety of other causes. It is expected that there is no guarantee on the order in which the messages arrive. All things equal, it is equally likely that you would see the "null" message as the final result as it would be the correct updated message. This issue is only further compounded if the foreign key were changed several times in short succession, with multiple additional partitions.

Resolving out-of-order events:

As proposed by John Roesler, the resolver can take advantage of the local data stores on the node:

...

While it is possible for a stale event to be propagated (ie: matches the foreign key, but is stale), the up-to-date event will be propagated when it arrives. This is eventually consistent. Entity changes do not cause the same race conditions that event-changes do, and so are not of a concern. They will, however, fall under this same resolution scheme.

Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

...

PropagationWrapper - Removed it to look at using Record Headers instead to contain offset and propagation metadata.

The PropagationWrapper - Resolving out-of-order data processing

Purpose

This class stores two properties for handling out-of-order resolution.

  • long offset - Contains the offset, in a given partition, for an event keyed on KR in the original source topic. This is used in the highwater mark computation at the resolution stage.
  • boolean propagate - This is used to halt the propagation of the "null" value for foreignKey changes, but NOT the propagation of "null" values for deletions. This comes into play during resolution.

Deletion

Since a deleted record will never be just a plain "null" value anymore (but instead a wrapped null), the log cleaner will not automatically clean it up. This can still be compacted, but it will result in higher data usage than if we were able to use pure null values. A suggestion is that we can add the concept of a "weak reference" to Kafka, where the log cleaner will still treat it as a null for log cleaner purposes, but where it could also still contain data. This is beyond the scope of this ticket

Usage - Example of PropagationWrapper

...


Rejected Alternatives:

Problem: Out-of-order processing of Rekeyed data
Solution A - Hold Ordering Metadata in Record Headers and Highwater Mark Table

...

  • There is no impact to existing users.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...