THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
KTable<KA, VA> tableA = builder.table("topicA"); KTable<KB, VB> tableB = builder.table("topicB"); // assuming topicB's message value payload contains field of "KA" KTbale<KB, VR> tableR = tableB.join(tableA, (keyB, valueB) -> valueB.get("fieldKA"), (keyB, keyA) -> combine(keyB, keyA), (valueA, valueB) -> joined); // join return type "KRVR" |
Non-window to Window Table Join
...
- First of all, we will repartition this KTable's stream, by key computed from the mapper(K, V) → K1, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
- After re-partitioning the other table, materialize both streams. The other table is materialized as K1 → V1, and this table is materialized as combo key (K1, K) with value V (note that we need to apply the mapper(K, V) → K1 again).
- When a record (k1, v1) is received from the other table's stream, update its materialized table, and then make a range query on the other materialized table as (k1, *), and for each matched record (k1, k) → v apply the joiner(v, v1) → r, and send (k2, r) where select(k, k1) -> k2.
- When a record (k, v) is received from this table's repartitioned stream, update its materialized table by applying the mapper, and then make a get query on the this materialized table by mapped key mapper(k, v) -> k1, and for the single matched record k1 → v1 apply reversed joiner(v1, v) → r, and return (kk2, r) where select(k, k1) -> k2.
...