...
- The first common use case is "table join by foreign-key". Details can be found here:
Jira server ASF JIRA columns key,summary,type,created,updated,due,assignee,reporter,priority,status,resolution serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-3705 - The second common use case is WKTable-KTable join or WKTable-WKTable join. For example, think of "subscribing to a feed of log4j output and detect any error that is at least N standard deviations higher than the same time last week (assuming a sliding window of 1 minute every 1 second)". They are thought about in the original design (see this page for details), but decided to be discarded for the first release due to API complexity.
...
Code Block | ||
---|---|---|
| ||
public interface KTable<K, V> { // mapper is used for mapping thethis other table's key-value into thisother table's key. ; <V1,// R>selector KTable<K,is R> join(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, ValueJoiner<V, V1, R> joiner); <V1, R> KTable<K, R> leftJoinused for combining the joining tables' keys as the result table's key. <K1, V1, R> KTable<K2, R> join(KTable<K1, V1> other, KeyValueMapper<K1KeyValueMapper<K, V1V, K>K1> mapper, ValueJoiner<VKeyValueMapper<K, V1K1, R> joiner); <V1, R> KTable<K, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, K2> selector, ValueJoiner<V, V1, R> joiner); // mapper is used for mapping this table's key-value into other table's key. <V1<K1, V1, R> KTable<KKTable<K2, R> joinleftJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner); <V1, R> KTable<K, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K, VK1, K1>K2> mapperselector, ValueJoiner<V, V1, R> joiner); <V1<K1, V1, R> KTable<KKTable<K2, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, KeyValueMapper<K, K1, K2> selector, ValueJoiner<V, V1, R> joiner); } |
...
Code Block |
---|
KTable<KA, VA> tableA = builder.table("topicA"); KTable<KB, VB> tableB = builder.table("topicB"); // assuming topicB's message value payload contains field of "KA" KTbale<KAKTbale<KB, VR> tableR = tableAtableB.join(tableBtableA, (keyB, valueB) -> valueB.get("fieldKA"), (keyB, keyA) -> keyB, (valueA, valueB) -> joined); // join return type "KRVR" |
Non-window to Window Table Join
...
Code Block |
---|
KTable<KA, VA> tableA = builder.table("topicA"); KTable<Windowed<KA>, VB> tableB = stream.aggregate(...); KTbale<Windowed<KA>, VR> tableR = tableAtableB.join(tableBtableA, (windowKeyAwindowKeyB, valueB) -> windowKeyB.key, (windowKeyB, keyA) -> windowKeyAwindowed(combine(windowKeyB.key, keyA)), (valueA, valueB) -> joined); // join return type "KR" |
...
Code Block |
---|
KTable<Windowed<KA>, VA> tableA = stream.aggregate(...); KTbale<Windowed<KA>, VR> tableR = tableA.join(tableA, (windowKeyA, valueBvalueA) -> windowKeyA.shiftLeft(1000), (windowKeyA, windowKeyB) -> windowKeyB, (valueA, valueB) -> joined); |
...
- First of all, we will repartition the other this KTable's stream, by key computed from the mapper(K1K, V1V) → KK1, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
- After re-partitioning the other table, materialize both streams. This The other table is materialized as K → V, the other K1 → V1, and this table is materialized as combo key (K1, K, K1) → V1) with value V (note that we need to apply the mapper(K1K, V1V) → KK1 again).
- When a record (kk1, vv1) is received from this the other table's stream, update its materialized table, and then make a range query on the other materialized table as (kk1, *), and for each matched record apply (k1, k) → v apply the joiner(Vv, V1v1) → Rr, and return (K, R)send (k2, r) where select(k, k1) -> k2.
- When a record (k1k, v1v) is received from the other this table's repartitioned stream, update its materialized table by applying the mapper, and then make a get query on the other this materialized table by mapped key kkey mapper(k, v) -> k1, and for the single matched record apply k1 → v1 apply reversed joiner(V1v1, Vv) → Rr, and return (K, R)(k2, r) where select(k, k1) -> k2.
NOTE: When the mapping is from this table's key-value pair to the other table's key, the implementation is just vice-versa, expect that in step 3) / 4) above the return value is (K1, R), not (K, R).
sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition. And these two records will be joined separately and independently, and hence their output ordering is arbitrary, and hence the joining results that keeps track of the changes as k -> {r_old, r_new} could be incorrect if it appends k -> {null, r_new} before k -> {r_old, null} (details can be found in
). That is why we need to have the result table key as a combination of both joining table keys, instead of just inheriting the left join table's key for normal foreign-key join. Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-3705
Now the only question left is how we can do range query on combo (key, *). There are two approaches for this:
...
Note that there is one caveat: this feature is not available in RocksDB JNI yet, I have filed a RocksDB issue (ticket), and if necessary we can contribute back to RocksDB for this feature.
Update: prefix hashing has been added to JNI: https://github.com/facebook/rocksdb/pull/1109
Pros: leverage RocksDB internal feature, which is supposed to be performant.
...