...
- Call enable sendOldValues() on sources with "*"
- Register a child of B
- extract A's key and B's key as key and B as value.
- forward(key, null) for old
- forward(key, b) for new
- skip old if A's key didn't change (ends up in same partition)
- extract A's key and B's key as key and B as value.
- Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
- in the sink, only use A's key to determine partition
- Register source for intermediate topic
- co-partition with A's sources
- materialize
- serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
- Register processor after above source.
- On event extract A's key from the key
- look up A by it's key
- perform the join (as usual)
- Register processor after A's processor
- On event uses A's key to perform a Range scan on B's materialization
- For every row retrieved perform join as usual
- Register merger
- Forward join Results
- On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
- Merger wrapped into KTable and returned to the user.
Step by Step
______THIS SECTION IS WORK IN PROGRESS_________________
______WILL REMOVE THIS MARKER WHEN SURE ITS CORRECT__
______FIRST VALUE OF B => A's KEY_________________________
TOPOLOGY INPUT A | TOPOLOGY INPUT B | STATE A MATERIALZED | STATE B MATERIALIZE | INTERMEDIATE RECORDS PRODUCED | STATE B OTHER TASK | OUTPUT RANGE PROCESSOR | OUTPUT LOOKUP PROCESSOR |
---|---|---|---|---|---|---|---|
key: A0 value: [A0 ...] | |||||||
key: B0 : value [A2,B0 ...] | partition: A2 | ||||||
Range lookup
It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.
...