Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

...

One main requirement that I have run into several times is that the denormalized data be updated should any of its components change. This is required for a consistent view of the data for all downstream consumers. To do this, we must perform stateful joins using KTable to KTables. Unfortunately, this approach also requires that we rekey the data on the foreign key, which while supported, introduces a number of its own issues. Furthermore, solutions to this approach become untenable as the data volume grows. Further details into the problems with this approach can be seen by this excellent blog post by Dan Lebrero.

http://danlebrero.com/2017/05/07/kafka-streams-ktable-globalktable-joining-reference-data/

For relational data with a very narrow domain, rekeying on the foreign key becomes prohibitively expensive in terms of maintaining the data structures. Additional complexities about managing order, duplicate outputs and race conditions also begin to apply. In fact, the narrower the domain of the foreign keyed data, the worse the problem becomes. However, with KTable joins to GlobalKTables, the narrower the domain of the data the easier it is for the globalKTable to provide easy and performant joins without managing all of this overhead. A basic illustration of this problem is given in the following image.

Gliffy Diagram
bordertrue
nameKTableToKTableForeignKeyJoinIssue
pagePin4

By allowing for a KTable-to-GlobalKTable join, driven by both sides, this problem space can be greatly simplified and result in a higher performance, simpler way to handle highly relational data. I have personally worked on several Kafka Streams applications that follow this approach described above, and it is only due to the relatively friendly nature of much of our data that we have managed to get a semblance of functionality out of it. For highly normalized data, such a pattern is not tenable. As it currently stands in Kafka Streams there is no easy way to handle multiple joins on small, normalized tables in a way that is easy to reason about and easy to implement.

Public Interfaces

streams/kstreams/KTable.java


Code Block
public interface KTable<K, V> {
	<GK, GV, RV> KTable<K, RV> join(final GlobalKTable<GK, GV> globalKTable,
                                 final KeyValueMapper<? super K, ? super V, ? extends GK> keyMapper,
                                 final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);

	<GK, GV, RV> KTable<K, RV> leftJoin(final GlobalKTable<GK, GV> globalKTable,
                                     final KeyValueMapper<? super K, ? super V, ? extends GK> keyMapper,
                                     final ValueJoiner<? super V, ? super GV, ? extends RV> joiner);
}



new - streams/kstreams/ScannableKTableValueGetter.java

Code Block
//This is a new interface that will be used to operate on the foreign-key-prefix KTable.

public interface ScannableKTableValueGetter<K,V> extends KTableValueGetter {
    KeyValueIterator<K,V> scan(String prefixFrom, String prefixTo);
}


new - streams/kstreams/KTableGlobalKTableInnerJoin.java

Code Block
//Very similar to the KTableKTableInnerJoin, but following the  bilateral join patterns outlined in this KIP.

class KTableGlobalKTableInnerJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1, V1> {
}


new - streams/kstreams/KTableGlobalKTableLeftJoin.java

Code Block
//Very similar to the KTableKTableLeftJoin, but following the bilateral join patterns outlined in this KIP.

class KTableGlobalKTableLeftJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1, V1> {
}

Note: GlobalKTableKTableLeftJoin is not possible, as the GlobalKTable should not be able to produce events without there being an associated element in the KTable side. This behaviour would be undefined as each node would produce an event whenever a GKT had an updateBriefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.




A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.Summary

  • A new KTable to GlobalKTable inner and left join.
  • GlobalKTable's output can be attached to a processor, to drive the join from the right side of the KTable-to-GlobalKTable join.
  • Use RocksDB prefix RangeScan to join from the Right, using a rekeyed table.

Problem

Gliffy Diagram
nameKTableToKTableForeignKeyJoinIssue
pagePin4

Proposal

Gliffy Diagram
nameProposed
pagePin4

  • the re-keyed table.

Proposal

The above is illustrated at a high level in the following diagram. A new KTable is created with the results of the KeyValueMapper as a prefix of the state store Key. This in turn allows for updated KTable rows to be joined directly by using specific prefix joiner logic, which will be included in this JIRA. In addition, the GlobalKTable can also attach itself to a processor, and induce joins on the KTable. This is necessary to fulfill the requirement of keeping all stateful data up to date, which is a normal operation when updating data in a relational database. This can be seen in the diagram following the next.

Gliffy Diagram
bordertrue
nameProposed
pagePin4


GlobalKTable as Driver, joined on KTable join mechanism.

The following image illustrates the GlobalKTable join mechanism.

  1. The GlobalKTable receives an update from its upstream topic. This exercises the join logic on the ModifiedEvents table.

  2. A Range Scan on the GKT key is performed on the ModifiedEvents store. This will enable use to retrieve all of the elements from the KTable that satisfy the prefix.

  3. The Range Scan results are returned to the join processor, where the join logic is executed.

  4. The processor joins the data together using inner-join logic. The subsequent events are output from the processor to the downstream consumers.

Note: The GlobalKTable cannot be on the left in a Left-Join. This would result in each instance producing an event on a GKT update, which is not supportedGlobalKTableToKTable(inner/left) join mechanism.

Gliffy Diagram
nameGKT-Driver
pagePin2

KTableToGlobalKTable(inner/left) join mechanism.


KTable as Driver, joined on GlobalKTable join mechanism

  1. The row is updated in the KTable, with the downstream value being updated. If the KeyValueMapper would delete the old value, then a Change element must be used to detect what the old value was, such that it's ModifiedEvent key can be created and the row deleted.

  2. The updated row is sent downstream to the Join processor.

  3. The prefix is stripped from the ModifiedEvent row, and the GKT is queried.

  4. The GKT result is returned to the processor.

  5. The processor joins the data together depending on the left/inner logic. The subsequent events are output from the processor to the downstream consumers.

Gliffy Diagram
nameKTable-driver
pagePin5


Summary

In terms of major changes, this relies heavily on the RocksDB prefix scan, in both consistency and performance. This also relies heavily on changing the GKT to be a driver of processor join logic. In terms of data complexity, any pattern that requires us to rekey the data once is equivalent in terms of data capacity requirements, regardless of if we rekey the data to have a prefix, or rekey it such that all elements of a foreign-key are in the same row.


The mains advantages of the proposed solution here is that:

  • Each row in the ModifiedEvents table is just slightly bigger than the source table row, due to the addition of the key prefix. This is contrary to the original problem, where a groupByKey on a rekeyed KTable mandates that each element with the same key must be grouped into the same row, thereby ensuring that the row size is unbounded. This risks OOM errors, difficulty in maintaining the updates over time, and an inability to back data up to the Kafka cluster when rows grow into the 10s, 100s and 1GB range.

  • There is no need for re-partitioning. We want the data to remain local to each node, as only the final result is what matters. This reduces the load on the Kafka cluster and reduces both financial and processing costs.

  • Updates to the GKT can be propagated out to all keyed entities that use that data. This is highly valuable in providing a way to depart from the relational structures of many change-data-capture produced events.


Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

There should be no impact to any current users, nor any change to existing join functionality.

The only component that will require a closer look is the usage of the GlobalKTable as a processor driver. Currently, the GKT is only usable as a lookup and will not drive join logic. Aside from wishing to avoid ill-defined behaviour, I can't see any technical reasons why we cannot do this. My familiarity with this component and the history behind it is minimal though, as this is the first KIP and JIRA that I would be addressing in Kafka.

Rejected Alternatives

None currently known or described.If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.y