...
- Call enable sendOldValues() on sources with "*"
- Register a child of B
- extract A's key and B's key as key and B as value.
- forward(key, null) for old
- forward(key, b) for new
- skip old if A's key didn't change (ends up in same partition)
- extract A's key and B's key as key and B as value.
- Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
- in the sink, only use A's key to determine partition
- Register source for intermediate topic
- co-partition with A's sources
- materialize
- serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
- Register processor after above source.
- On event extract A's key from the key
- look up A by it's key
- perform the join (as usual)
- Register processor after A's processor
- On event uses A's key to perform a Range scan on B's materialization
- For every row retrieved perform join as usual
- Register merger
- Forward join Results
- On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
- Merger wrapped into KTable and returned to the user.
Step by Step
TOPOLOGY INPUT A | TOPOLOGY INPUT B | STATE A MATERIALZED | STATE B MATERIALIZE | INTERMEDIATE RECORDS PRODUCED | STATE B OTHER TASK | Output A Source / Input Range Proccesor | OUTPUT RANGE PROCESSOR | OUTPUT LOOKUP PROCESSOR |
---|---|---|---|---|---|---|---|---|
key: A0 value: [A0 ...] | key: A0 value: [A0 ...] | Change<null,[A0 ...]> | invoked but nothing found. Nothing forwarded | |||||
key: A1 value: [A1 ...] | key: A0 value: [A0 ...] key: A1 value: [A1 ...] | Change<null,[A1 ...]> | invoked but nothing found. Nothing forwarded | |||||
key: B0 : value [A2,B0 ...] | key: A0 value: [A0 ...] key: A1 value: [A1 ...] | key: B0 : value [A2,B0 ...] | partition key: A2 key: A2B0 value: [A2,B0 ...] | key: A2B0 : value [A2,B0 ...] | invoked but nothing found Nothing forwarded | |||
key: B1 : value [A2,B1 ...] | key: A0 value: [A0 ...] key: A1 value: [A1 ...] | key: B0 : value [A2,B0 ...] key: B1 : value [A2,B1 ...] | partition key: A2 key: A2B1 value [A2,B1 ...] | key: A2B0 : value [A2,B0 ...] key: A2B1 : value [A2,B1 ...] | invoked but nothing found Nothing forwarded | |||
key: A2 value: [A2 ...] | key: A0 value: [A0 ...] key: A1 value: [A1 ...] key: A2 value: [A2 ...] | key: B0 : value [A2,B0 ...] key: B1 : value [A2,B1 ...] | key: A2B0 : value [A2,B0 ...] key: A2B1 : value [A2,B1 ...] | Change<null,[A2 ...]> | key A2B0 value: Change<null,join([A2 ...],[A2,B0 ...]) key A2B1 value: Change<null,join([A2 ...],[A2,B1...]) | |||
key: B1 : value null | key: B0 : value [A2,B0 ...] | partition key: A2 key: A2B1 value:null | key: A2B0 : value [A2,B0 ...] | key A2B1 value: Change<join([A2 ...],[A2,B1...],null) | ||||
key: B3 : value [A0,B3 ...] | key: B0 : value [A2,B0 ...] key: B3 : value [A0,B3 ...] | partition key: A0 key: A0B3 value:[A0,B3 ...] | key: A2B0 : value [A2,B0 ...] key: A0B3 : value [A0,B3 ...] | key A0B3 value: Change<join(null,[A0 ...],[A0,B3...]) | ||||
key: A2 value: null | key: A0 value: [A0 ...] key: A1 value: [A1 ...] | key: B0 : value [A2,B0 ...] key: B3 : value [A0,B3 ...] | key: A2B0 : value [A2,B0 ...] key: A0B3 : value [A0,B3 ...] | Change<[A2 ...],null> | key A2B0 value: Change<join([A2 ...],[A2,B0 ...],null) |
Range lookup
It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.
...
Gliffy Diagram name HighwaterMarkUsage pagePin 3
Final Steps - Materializing
Since the final out-of-order data is sourced from a topic, the only way to ensure that downstream KTables have the means to query their parent's ValueGetter is to materialize the final state store. There is no way to get specific values directly from a topic source - a Materialized store is required. In this case, it would mean that a user-provided Materialized store is mandatory. The work flow would look like this:
Gliffy Diagram | ||||||
---|---|---|---|---|---|---|
|
Compatibility, Deprecation, and Migration Plan
- There is no impact to existing users.
Rejected Alternatives
- None