Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface KTable<K, V> {
 
    // mapper is used for mapping the otherthis table's key-value into this other table's key;
    // selector is used for combining the joining tables' keys as the result table's key.
 
    <V1<K1, V1, R> KTable<KKTable<K2, R> join(KTable<K1, V1> other, KeyValueMapper<K1KeyValueMapper<K, V1V, K>K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner);
 
    <K1, <V1V1, R> KTable<KKTable<K2, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K1KeyValueMapper<K, V1V, K>K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner);

    <V1<K1, V1, R> KTable<KKTable<K2, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K1KeyValueMapper<K, V1V, K>K1> mapper, ValueJoiner<VKeyValueMapper<K, V1K1, K2> selector, ValueJoiner<V, V1, R> joiner);
 
    // mapper is used for mapping this table's key-value into other table's key.
 
    <V1, R> KTable<K, R> join(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);

    <V1, R> KTable<K, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);

    <V1, R> KTable<K, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);
}

 

With the above APIs, the following use cases can be implemented:

Foreign-key Join

}

 

With the above APIs, the following use cases can be implemented:

Foreign-key Join

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<KB, VB> tableB = builder.table("topicB");    // assuming topicB's message value payload contains field of "KA"

KTbale<KB, VR> tableR = tableB.join(tableA, (keyB, valueB) -> valueB.get("fieldKA"), (keyB, keyA) -> keyB, (valueA, valueB) -> joined);   // join return type "VR"

 

Non-window to Window Table Join

For example, a windowed-table would "query" another non-windowed table to get augmented values.

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<Windowed<KA>, VB> tableB = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableB.join(tableA, (windowKeyB, valueB) -> windowKeyB.key, (windowKeyB, keyA) -> windowed(combine(windowKeyB.key, keyA)), (valueA, valueB) -> joined);   // join return type "KR"

 

Window to Window Table Join

For example, comparing a windowed-table with itself (self-join) by a shifted period.

Code Block
KTable<Windowed<KA>
Code Block
KTable<KA, VA> tableA = builderstream.table("topicA"aggregate(...);
 
KTable<KBKTbale<Windowed<KA>, VB>VR> tableBtableR = buildertableA.table("topicB");    // assuming topicB's message value payload contains field of "KA"
 
KTbale<KA, VR> tableR = tableA.join(tableB, (keyB, valueB) -> valueB.get("fieldKA"),join(tableA, (windowKeyA, valueA) -> windowKeyA.shiftLeft(1000), (windowKeyA, windowKeyB) -> windowKeyB, (valueA, valueB) -> joined);   // join return type "KR"

 

Non-window to Window Table Join

For example, a windowed-table would "query" another non-windowed table to get augmented values.

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<Windowed<KA>, VB> tableB = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableA.join(tableB, (windowKeyA, valueB) -> windowKeyA.key, (valueA, valueB) -> joined);   // join return type "KR"

 

Window to Window Table Join

For example, comparing a windowed-table with itself (self-join) by a shifted period.

Code Block
KTable<Windowed<KA>, VA> tableA = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableA.join(tableA, (windowKeyA, valueB) -> windowKeyA.shiftLeft(1000), (valueA, valueB) -> joined);

 

Implementation Details

In the following section we only talk about the first case, where the mapper is for mapping the other table's key-value pair into this table's key:

  1. First of all, we will repartition the other KTable's stream, by key computed from the mapper(K1, V1) → K, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
    1. When sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition.
  2. After re-partitioning the other table, materialize both streams. This table is materialized as K → V, the other table is materialized as combo key (K, K1) → V1 (note that we need to apply the mapper(K1, V1) → K again).
  3. When a record (k, v) is received from this table's stream, update its materialized table, and then make a range query on the other materialized table as (k, *), and for each matched record apply the joiner(V, V1) → R, and return (K, R).
    1. If the mapping is the other way around, the returned message should be (K1, R).
  4. When a record (k1, v1) is received from the other table's stream, update its materialized table by applying the mapper, and then make a get query on the this materialized table by mapped key k, and for the single matched record apply reversed joiner(V1, V) → R, and return (K, R).
    1. If the mapping is the other way around, the returned message should be (K1, R).

 

 

Implementation Details

In the following section we only talk about the first case, where the mapper is for mapping the other table's key-value pair into this table's key:

  1. First of all, we will repartition this KTable's stream, by key computed from the mapper(K, V) → K1, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
  2. After re-partitioning the other table, materialize both streams. The other table is materialized as K1 → V1, and this table is materialized as combo key (K1, K) with value V (note that we need to apply the mapper(K, V) → K1 again).
  3. When a record (k1, v1) is received from the other table's stream, update its materialized table, and then make a range query on the other materialized table as (k1, *), and for each matched record (k1, k)  v apply the joiner(v, v1) → r, and send (k2, r) where select(k, k1) -> k2.
  4. When a record (k, v) is received from this table's repartitioned stream, update its materialized table by applying the mapper, and then make a get query on the this materialized table by mapped key mapper(k, v) -> k1, and for the single matched record k1 → v1 apply reversed joiner(v1, v) → r, and return (k2, r) where select(k, k1) -> k2.

 

NOTE: When sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition. And these two records will be joined separately and independently, and hence their output ordering is arbitrary, and hence the joining results that keeps track of the changes as k -> {r_old, r_new} could be incorrect if it appends k -> {null, r_new} before k -> {r_old, null} (details can be found in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3705
). That is why we need to have the result table key as a combination of both joining table keys, instead of just inheriting the left join table's key for normal foreign-key join.

 When the mapping is from this table's key-value pair to the other table's key, the implementation is just vice-versa, expect that in step 3) / 4) above the return value is (K1, R), not (K, R).

 

Now the only question left is how we can do range query on combo (key, *). There are two approaches for this:

...

Note that there is one caveat: this feature is not available in RocksDB JNI yet, I have filed a RocksDB issue (ticket), and if necessary we can contribute back to RocksDB for this feature.

Update: prefix hashing has been added to JNI: https://github.com/facebook/rocksdb/pull/1109

 

Pros: leverage RocksDB internal feature, which is supposed to be performant.

...