Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface KTable<K, V> {
 
    // mapper is used for mapping the otherthis table's key-value into thisother table's key.;
 
    <V1,// R>selector KTable<K,is R> join(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, ValueJoiner<V, V1, R> joiner);used for combining the joining tables' keys as the result table's key.
 
    <V1<K1, V1, R> KTable<KKTable<K2, R> leftJoinjoin(KTable<K1, V1> other, KeyValueMapper<K1KeyValueMapper<K, V1V, K>K1> mapper, ValueJoiner<VKeyValueMapper<K, V1K1, R> joiner);

    <V1, R> KTable<K, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, K2> selector, ValueJoiner<V, V1, R> joiner);
 
    // mapper is used for mapping this table's key-value into other table's key.
 
    <V1<K1, V1, R> KTable<KKTable<K2, R> joinleftJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);


    <V1, R> KTable<K, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K, VK1, K1>K2> mapperselector, ValueJoiner<V, V1, R> joiner);


    <K1, <V1V1, R> KTable<KKTable<K2, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner);
}

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<KB, VB> tableB = builder.table("topicB");    // assuming topicB's message value payload contains field of "KA"
 
KTbale<KAKTbale<KB, VR> tableR = tableAtableB.join(tableBtableA, (keyB, valueB) -> valueB.get("fieldKA"), (keyB, keyA) -> keyB, (valueA, valueB) -> joined);   // join return type "KRVR"

 

Non-window to Window Table Join

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<Windowed<KA>, VB> tableB = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableAtableB.join(tableBtableA, (windowKeyAwindowKeyB, valueB) -> windowKeyB.key, (windowKeyB, keyA) -> windowKeyAwindowed(combine(windowKeyB.key, keyA)), (valueA, valueB) -> joined);   // join return type "KR"

...

Code Block
KTable<Windowed<KA>, VA> tableA = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableA.join(tableA, (windowKeyA, valueBvalueA) -> windowKeyA.shiftLeft(1000), (windowKeyA, windowKeyB) -> windowKeyB, (valueA, valueB) -> joined);

...

  1. First of all, we will repartition the other this KTable's stream, by key computed from the mapper(K1K, V1V) → KK1, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
  2. After re-partitioning the other table, materialize both streams. This The other table is materialized as K → V, the other K1 → V1, and this table is materialized as combo key (K1, K, K1) → V1) with value V (note that we need to apply the mapper(K1K, V1V) → KK1 again).
  3. When a record (kk1, vv1) is received from this the other table's stream, update its materialized table, and then make a range query on the other materialized table as (kk1, *), and for each matched record apply (k1, k)  v apply the joiner(Vv, V1v1) → Rr, and return (K, R)send (k2, r) where select(k, k1) -> k2.
  4. When a record (k1k, v1v) is received from the other this table's repartitioned stream, update its materialized table by applying the mapper, and then make a get query on the other this materialized table by mapped key kkey mapper(k, v) -> k1, and for the single matched record apply  k1 → v1 apply reversed joiner(V1v1, Vv) → Rr, and return (K, R)(k2, r) where select(k, k1) -> k2.

 

NOTE: When the mapping is from this table's key-value pair to the other table's key, the implementation is just vice-versa, expect that in step 3) / 4) above the return value is (K1, R), not (K, R).

 

sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition. And these two records will be joined separately and independently, and hence their output ordering is arbitrary, and hence the joining results that keeps track of the changes as k -> {r_old, r_new} could be incorrect if it appends k -> {null, r_new} before k -> {r_old, null} (details can be found in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3705
). That is why we need to have the result table key as a combination of both joining table keys, instead of just inheriting the left join table's key for normal foreign-key join.

 

 

Now the only question left is how we can do range query on combo (key, *). There are two approaches for this:

...

Note that there is one caveat: this feature is not available in RocksDB JNI yet, I have filed a RocksDB issue (ticket), and if necessary we can contribute back to RocksDB for this feature.

Update: prefix hashing has been added to JNI: https://github.com/facebook/rocksdb/pull/1109

 

Pros: leverage RocksDB internal feature, which is supposed to be performant.

...