Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<KB, VB> tableB = builder.table("topicB");    // assuming topicB's message value payload contains field of "KA"

KTbale<KB, VR> tableR = tableB.join(tableA, (keyB, valueB) -> valueB.get("fieldKA"), (keyB, keyA) -> combine(keyB, keyA), (valueA, valueB) -> joined);   // join return type "KRVR"

 

Non-window to Window Table Join

...

  1. First of all, we will repartition this KTable's stream, by key computed from the mapper(K, V) → K1, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
  2. After re-partitioning the other table, materialize both streams. The other table is materialized as K1 → V1, and this table is materialized as combo key (K1, K) with value V (note that we need to apply the mapper(K, V) → K1 again).
  3. When a record (k1, v1) is received from the other table's stream, update its materialized table, and then make a range query on the other materialized table as (k1, *), and for each matched record (k1, k)  v apply the joiner(v, v1) → r, and send (k2, r) where select(k, k1) -> k2.
  4. When a record (k, v) is received from this table's repartitioned stream, update its materialized table by applying the mapper, and then make a get query on the this materialized table by mapped key mapper(k, v) -> k1, and for the single matched record k1 → v1 apply reversed joiner(v1, v) → r, and return (kk2, r) where select(k, k1) -> k2.

...