Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The more intrusive version gives the user better clarity that his resulting KTable is not only keyed by the other table's key but its also keyed by this table's key. So he will be less surprised that in a theoretical later aggregation he might find the same key from the other ktable twice. On the other hand the less intrusive method doesn't need to introduce this wrapper class but let the user handle the need of having both tables keys present in the output key himself. This might lead to a deeper understanding for the user and serdes might be able to pack the data denser. An additional benefit is that the user can stick with his default serde or his standard way of serializing when sinking the data into another topic using for example to() while the CombinedKey would require an additional mapping to what the less intrusive method has.

From mailing list discussion

Back and forth mapper

This is a proposal to get rid of the Type CombinedKey in the return type. We would internally use a Combined key and a Combined Key Serde and apply the mappers only at the processing boundaries (ValueGetterSupplier, context.forward). The data will still be serialized for repartitioning in a way that is specific to Kafka and might prevent users from using their default tooling.

Code Block
languagejava
titleKTable.java
     /**
	 * 
     * Joins one record of this KTable to n records of the other KTable,
     * an update in this KTable will update all n matching records, an update
     * in other table will update only the one matching record.
     * 
     * @param the table containing n records for each K of this table
     * @param keyExtractor a {@code ValueMapper} returning the key of this table from the others value
     * @param customCombinedKey a {@code ValueMapper} allowing the CombinedKey to be wrapped in a custom object
	 * @param combinedKey a {@code ValueMapper} allowing to unwrap the custom object again.
     * @param <VO> the resultings tables Value
     * @param joiner
     * @return
     */
    <KO VO, K1, V1> KTable<KO,VO> oneToManyJoin(KTable<K1, V1> other,
            ValueMapper<V1, K> keyExtractor, 
			ValueMapper<CombinedKey<K1,K>,KO> customCombinedKey,
			ValueMapper<KO,CombinedKey<K1,K>> combinedKey,
			ValueJoiner<V, V1, VO> joiner,
            Serde<K1> keyOtherSerde, 
	        Serde<V1> valueOtherSerde,
            Serde<VO> joinValueSerde);

...

Custom Serde

Introducing an additional new Serde. This is the approach is the counterpart to having a back and forth mapper.

Code Block
package org.apache.kafka.streams;

public class CombinedKeySerde<KO>CombinedKeySerde<K,K1> extends Serde<CombinedKey<K1Serde<CombinedKey<K,K>K1>> {


	public byte[] getPartialKeybytes(K prefixKey);
 }
Code Block
languagejava
titleKTable.java
   /**
     * 
     * Joins one record of this KTable to n records of the other KTable,
     * an update in this KTable will update all n matching records, an update
     * in other table will update only the one matching record.
     * 
     * @param the table containing n records for each K of this table
     * @param keyExtractor a {@code ValueMapper} returning the key of this table from the others value
     * @param leftKeyExtractor a {@code ValueMapper} extracting the Key of this table from the resulting Key 
     * @param <VO> the resultings tables Value
     * @param joiner
     * @return
     */
    <VO, K1, V1> KTable<CombinedKey<K,K1>,VO> oneToManyJoin(KTable<K1, V1> other,
            ValueMapper<V1, K> keyExtractor, 
            ValueJoiner<V, V1, VO> joiner,
            Serde<K1> keyOtherSerde, Serde<V1> valueOtherSerde,
            Serde<VO> joinValueSerde,
			CombinedKeySerde<KO,K,K1> combinedKeySerde);

  

tradeoffs

Proposed Changes

Goal

With the two relations A,B and there is one A for each B and there may be many B's for each A. A is represented by the KTable the method described above gets invoked on, while B is represented by that methods first argument. We want to implement a Set of processors that allows a user to create a new KTable where A and B are joined based on the reference to A in B. A and B are represented as KTable B being partitioned by B's key and A being partitioned by A's key.

...

TOPOLOGY INPUT ATOPOLOGY INPUT BSTATE A MATERIALZEDSTATE B MATERIALIZEINTERMEDIATE RECORDS PRODUCEDSTATE B OTHER TASKOutput A Source / Input Range ProccesorOUTPUT RANGE PROCESSOROUTPUT LOOKUP PROCESSOR
key: A0 value: [A0 ...] key: A0 value: [A0 ...]   Change<null,[A0 ...]>

invoked but nothing found.

Nothing forwarded

 
key: A1 value: [A1 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

   Change<null,[A1 ...]>invoked but nothing found. Nothing forwarded 
 key: B0 : value [A2,B0 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]partition key: A2 key: A2B0 value: [A2,B0 ...]key: A2B0 : value [A2,B0 ...]  

invoked but nothing found

Nothing forwarded

 key: B1 : value [A2,B1 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

partition key: A2 key: A2B1 value [A2,B1 ...]

key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

  

invoked but nothing found

Nothing forwarded

key: A2 value: [A2 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: A2 value: [A2 ...]

  

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

 

key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

 

Change<null,[A2 ...]>

key A2B0 value: Change<null,join([A2 ...],[A2,B0 ...])

key A2B1 value: Change<null,join([A2 ...],[A2,B1...])

 
 key: B1 : value null key: B0 : value [A2,B0 ...]partition key: A2 key: A2B1 value:nullkey: A2B0 : value [A2,B0 ...]  key A2B1 value: Change<join([A2 ...],[A2,B1...],null)
 key: B3 : value [A0,B3 ...] 

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

partition key: A0 key: A0B3 value:[A0,B3 ...]

key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

  key A0B3 value: Change<join(null,[A0 ...],[A0,B3...])
key: A2 value: null 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

  

key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

 

Change<[A2 ...],null>key A2B0 value: Change<join([A2 ...],[A2,B0 ...],null) 

...