Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface KTable<K, V> {
 
    // mapper is used for mapping this table's key-value into other table's key;
    // noteselector that we do not support the other direction where the returned KTable has type <K1, R>, 
    // since for primary-key / foreign-key join the returned primary key should be the one that contains the foreignis used for combining the joining tables' keys as the result table's key.
 
    <K1, V1, R> KTable<KKTable<K2, R> join(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner);

    <K1, V1, R> KTable<KKTable<K2, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner);

    <K1, V1, R> KTable<KKTable<K2, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner);
}

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<KB, VB> tableB = builder.table("topicB");    // assuming topicB's message value payload contains field of "KA"

KTbale<KB, VR> tableR = tableB.join(tableA, (keyB, valueB) -> valueB.get("fieldKA"), (keyB, keyA) -> keyB, (valueA, valueB) -> joined);   // join return type "KRVR"

 

Non-window to Window Table Join

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<Windowed<KA>, VB> tableB = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableB.join(tableA, (windowKeyAwindowKeyB, valueB) -> windowKeyB.key, (windowKeyB, keyA) -> windowKeyAwindowed(combine(windowKeyB.key, keyA)), (valueA, valueB) -> joined);   // join return type "KR"

...

Code Block
KTable<Windowed<KA>, VA> tableA = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableA.join(tableA, (windowKeyA, valueBvalueA) -> windowKeyA.shiftLeft(1000), (windowKeyA, windowKeyB) -> windowKeyB, (valueA, valueB) -> joined);

...

  1. First of all, we will repartition this KTable's stream, by key computed from the mapper(K, V) → K1, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
  2. After re-partitioning the other table, materialize both streams. The other table is materialized as K1 → V1, and this table is materialized as combo key (K1, K) with value V (note that we need to apply the mapper(K, V) → K1 again).
  3. When a record (k1, v1) is received from the other table's stream, update its materialized table, and then make a range query on the other materialized table as (k1, *), and for each matched record (k1, k)  v apply the joiner(v, v1) → r, and return send (k2, r) where select(k, rk1) -> k2.
  4. When a record (k, v) is received from this table's repartitioned stream, update its materialized table by applying the mapper, and then make a get query on the this materialized table by mapped key mapper(k, v) -> k1, and for the single matched record k1 → v1 apply reversed joiner(v1, v) → r, and return (k2, r) where select(k, rk1) -> k2.

 

NOTE: When sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition. And these two records will be joined separately and independently, and hence their output ordering is arbitrary, and hence the joining results that keeps track of the changes as k -> {r_old, r_new} could be incorrect if it appends k -> {null, r_new} before k -> {r_old, null} (details can be found in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3705
). That is why we need to have the result table key as a combination of both joining table keys, instead of just inheriting the left join table's key for normal foreign-key join.

 

 

Now the only question left is how we can do range query on combo (key, *). There are two approaches for this:

...

Note that there is one caveat: this feature is not available in RocksDB JNI yet, I have filed a RocksDB issue (ticket), and if necessary we can contribute back to RocksDB for this feature.

Update: prefix hashing has been added to JNI: https://github.com/facebook/rocksdb/pull/1109

 

Pros: leverage RocksDB internal feature, which is supposed to be performant.

...