THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
With the two relations A,B and there is one A for each B and there may be many B's for each A. A is represented by the KTable the method described above gets invoked on, while B is represented by that methods first argument. We want to implement a Set of processors that allows a user to create a new KTable where A and B are joined based on the reference to A in B. A and B are represented as KTable B being partitioned by B's key and A being partitioned by A's key.
Algorithm
- Materialize B firstCall enable sendOldValues() on sources with "*"
- Register a child of B
- extract A's key and B's key as key and B as value.
- forward(key, null) for old
- forward(key, b) for new
- skip old if A's key didn't change (ends up in same partition)
- extract A's key and B's key as key and B as value.
- Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
- in the sink, only use A's key to determine partition
- Register source for intermediate topic
- co-partition with A's sources
- materialize
- serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
- Register processor after above source.
- On event extract A's key from the key
- look up A by it's key
- perform the join (as usual)
- Register processor after A's processor
- On event uses A's key to perform a Range scan on B's materialization
- For every row retrieved perform join as usual
- Register merger
- Forward join Results
- On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
- Merger wrapped into KTable and returned to the user.
...