Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

With the two relations A,B and there is one A for each B and there may be many B's for each A. A is represented by the KTable the method described above gets invoked on, while B is represented by that methods first argument. We want to implement a Set of processors that allows a user to create a new KTable where A and B are joined based on the reference to A in B. A and B are represented as KTable B being partitioned by B's key and A being partitioned by A's key.

Algorithm

Image Added

ascii_raw_KIP-213

 Image Removed

  • Call enable sendOldValues() on sources with "*"
  • Register a child of B
    • extract A's key and B's key as key and B as value.
      • forward(key, null) for old
      • forward(key, b) for new
      • skip old if A's key didn't change (ends up in same partition)
  • Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
    • in the sink, only use A's key to determine partition
  • Register source for intermediate topic
    • co-partition with A's sources
    • materialize
    • serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
  • Register processor after above source.
    • On event extract A's key from the key
    • look up A by it's key
    • perform the join (as usual)
  • Register processor after A's processor
    • On event uses A's key to perform a Range scan on B's materialization
    • For every row retrieved perform join as usual
  • Register merger
    • Forward join Results
    • On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
  • Merger wrapped into KTable and returned to the user.

 

Range lookup

It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.

...