Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Call enable sendOldValues() on sources with "*"
  • Register a child of B
    • extract A's key and B's key as key and B as value.
      • forward(key, null) for old
      • forward(key, b) for new
      • skip old if A's key didn't change (ends up in same partition)
  • Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
    • in the sink, only use A's key to determine partition
  • Register source for intermediate topic
    • co-partition with A's sources
    • materialize
    • serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
  • Register processor after above source.
    • On event extract A's key from the key
    • look up A by it's key
    • perform the join (as usual)
  • Register processor after A's processor
    • On event uses A's key to perform a Range scan on B's materialization
    • For every row retrieved perform join as usual
  • Register merger
    • Forward join Results
    • On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
  • Merger wrapped into KTable and returned to the user.

Step by Step

______THIS SECTION IS WORK IN PROGRESS_________________

______WILL REMOVE THIS MARKER WHEN SURE ITS CORRECT__

______FIRST VALUE OF B => A's KEY_________________________

TOPOLOGY INPUT ATOPOLOGY INPUT BSTATE A MATERIALZEDSTATE B MATERIALIZEINTERMEDIATE RECORDS PRODUCEDSTATE B OTHER TASKOUTPUT RANGE PROCESSOROUTPUT LOOKUP PROCESSOR
key: A0 value: [A0 ...]       
 key: B0 : value [A2,B0 ...]  partition: A2   
        
        
        
        
        
        
        
        

 

 

Range lookup

It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.

...