Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface KTable<K, V> {
 
    // mapper is used for mapping the otherthis table's key-value into thisother table's key.
 ;
    <V1,// R>note KTable<K,that R> join(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, ValueJoiner<V, V1, R> joiner);
 
    <V1, R> KTable<K, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, ValueJoiner<V, V1, R> joiner);
we do not support the other direction where the returned KTable has type <K1, R>, 
    <V1,// R>since KTable<K, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K1, V1, K> mapper, ValueJoiner<V, V1, R> joiner);
 
    // mapper is used for mapping this table's key-value into other table's for primary-key / foreign-key join the returned primary key should be the one that contains the foreign key.
 
    <V1<K1, V1, R> KTable<K, R> join(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);

    <V1<K1, V1, R> KTable<K, R> leftJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);

    <K1, <V1V1, R> KTable<K, R> outerJoin(KTable<K1, V1> other, KeyValueMapper<K, V, K1> mapper, ValueJoiner<V, V1, R> joiner);
}

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<KB, VB> tableB = builder.table("topicB");    // assuming topicB's message value payload contains field of "KA"
 
KTbale<KAKTbale<KB, VR> tableR = tableAtableB.join(tableBtableA, (keyB, valueB) -> valueB.get("fieldKA"), (valueA, valueB) -> joined);   // join return type "KR"

...

Code Block
KTable<KA, VA> tableA = builder.table("topicA");
 
KTable<Windowed<KA>, VB> tableB = stream.aggregate(...);
 
KTbale<Windowed<KA>, VR> tableR = tableAtableB.join(tableBtableA, (windowKeyA, valueB) -> windowKeyA.key, (valueA, valueB) -> joined);   // join return type "KR"

...

  1. First of all, we will repartition the other KTable's stream, by key computed from the mapper(K1, V1) → K, so that it is co-partitioned by the same key. The co-partition topic is partitioned on the new key, but the message key and value are unchanged, and log compaction is turned off.
  2. When sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition.
  3. After re-partitioning the other table, materialize both streams. This table is materialized as K → V, the other table is materialized as combo key (K, K1) with value V1 (note that we need to apply the mapper(K1, V1) → K again).
  4. When a record (k, v) is received from this table's stream, update its materialized table, and then make a range query on the other materialized table as (k, *), and for each matched record apply (K, K1)  V1  apply the joiner(V, V1) → R, and return (K, R).If the mapping is the other way around, the returned message should be (K1, R).
  5. When a record (k1, v1) is received from the other table's stream, update its materialized table by applying the mapper, and then make a get query on the this materialized table by mapped key k, and for the single matched record apply reversed joiner(V1, V) → R, and return (K, R).If the mapping is the other way around, the returned message should be (K1, R).

 

When sendOldValues is turned on, where a pair <old, new> is passed into the other KTable's stream, then the mapped key may be different, and hence two separate records will be sent to the re-partition topic, one for removal and one for addition.

  the mapping is from this table's key-value pair to the other table's key, the implementation is just vice-versa, expect that in step 3) / 4) above the return value is (K1, R), not (K, R).

 

Now the only question left is how we can do range query on combo (key, *). There are two approaches for this:

...