Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Call enable sendOldValues() on sources with "*"
  • Register a child of B
    • extract A's key and B's key as key and B as value.
      • forward(key, null) for old
      • forward(key, b) for new
      • skip old if A's key didn't change (ends up in same partition)
  • Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
    • in the sink, only use A's key to determine partition
  • Register source for intermediate topic
    • co-partition with A's sources
    • materialize
    • serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
  • Register processor after above source.
    • On event extract A's key from the key
    • look up A by it's key
    • perform the join (as usual)
  • Register processor after A's processor
    • On event uses A's key to perform a Range scan on B's materialization
    • For every row retrieved perform join as usual
  • Register merger
    • Forward join Results
    • On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
  • Merger wrapped into KTable and returned to the user.

Step by Step

______THIS SECTION IS WORK IN PROGRESS_________________

______WILL REMOVE THIS MARKER WHEN SURE ITS CORRECT__

______FIRST VALUE OF B => A's KEY_________________________

This was only the example of records beeing found. next is

Update in A delete in A update in B delete in B
 
TOPOLOGY INPUT ATOPOLOGY INPUT BSTATE A MATERIALZEDSTATE B MATERIALIZEINTERMEDIATE RECORDS PRODUCEDSTATE B OTHER TASKOutput A Source / Input Range ProccesorOUTPUT RANGE PROCESSOROUTPUT LOOKUP PROCESSOR
key: A0 value: [A0 ...] key: A0 value: [A0 ...]   Change<null,[A0 ...]>

invoked but nothing found.

Not yet decided to implement right? outer joinNothing forwarded

 
key: A1 value: [A1 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

   Change<null,[A1 ...]>invoked but nothing found. Not yet decided to implement right? outer joinNothing forwarded 
 key: B0 : value [A2,B0 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]partition key: A2 key: A2B0 value: value [A2,B0 ...]key: A2B0 : value [A2,B0 ...]  

invoked but nothing found

Nothing forwarded

 key: B1 : value [A2,B1 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

partition key: A2 key: A2B1 value :value [A2,B0 B1 ...]

key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

  

invoked but nothing found

Nothing forwarded

key: A2 value: [A2 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: A2 value: [A2 ...]

   Change<null,[A2 ...]>      
         
         

key A2B0 value: Change<null,join([A2 ...],[A2,B0 ...])

key A2B1 value: Change<null,join([A2 ...],[A2,B1...])

 
 key: B1 : value null key: B0 : value [A2,B0 ...]partition key: A2 key: A2B1 value:nullkey: A2B0 : value [A2,B0 ...]  key A2B1 value: Change<join([A2 ...],[A2,B1...],null)
 key: B3 : value [A0,B3 ...] 

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

partition key: A0 key: A0B3 value:[A0,B3 ...]

key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

  key A0B3 value: Change<join(null,[A0 ...],[A0,B3...])
key: A2 value: null 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

   

      
         
Change<[A2 ...],null>key A2B0 value: Change<join([A2 ...],[A2,B0 ...],null)      

 

...

 

 

Range lookup

It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.

...

B records with a 'null' A-key value would be silently dropped.

Compatibility, Deprecation, and Migration Plan

...