Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted
Discussion thread
: here
JIRA
: KAFKA-3705  Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Public Interfaces

Code Block
    /**
     * Joins theJoin records of this {@code KTable} towith another table{@code keyedKTable} onusing anon-windowed differentinner keyjoin.
 Updates from this table will* join<p>
 * 1 to 1 on* theThis otheris table.a Updatesforeign tokey thejoin, otherwhere tablethe willjoining inducekey ais joindetermined onby eachthe record in this table that has
 * that specific foreign key.
 *
{@code foreignKeyExtractor}.
     *
     * @param other - the table containing the records               the other {@code KTable} to be joined on with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor - a {@link Function} that extracts the key (KO) from this table's value (V). If the
   * @param joiner* - specifies how to join the records from both tables
 * @param materialized - the materialized output store (optional)
 * @param <VR> the value type of the result {@code KTable}
 * @param <KO> the key type of the other {@code KTable}
is null, the update is ignored as invalid.
     * @param <VO>joiner the value type of the other {@code KTable}
 * @return
 */
<VR, KO, VO> KTable<K,a VR> join(final KTable<KO, VO> other,
        {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param <VR>                the finalvalue ValueMapper<V,type KO> foreignKeyExtractor,
of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>  final ValueJoiner<V, VO, VR> joiner,
          the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of finaljoining Materialized<K,this VR,table KeyValueStore<Bytes, byte[]>> materialized)

Workflow

The overall process is fairly simple and is outlined below.

Gliffy Diagram
nameJohnWorkflow
pagePin7

  • One optimization possibility is to materialize the data provided to the original node at Step 6. This would allow for changes to an event in This KTable which does not change the foreign key to shortcut sending an event to the Other node. The tradeoff is the need to maintain a materialized state store.

More Detailed Implementation Details

Prefix-Scanning using a CombinedKey

CombinedKey is a simple tuple wrapper that stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes. The CombinedKey is serialized as follows:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}{primaryKeySerialized}

This can support up to MAXINT size of data in bytes per key, which currently far exceeds the realistic sizing of messages in Kafka.

When performing the prefix scan on the RocksDB instance, we simply drop the primary key component, such that the serialized combined key looks like:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}

Other/Entity PrefixScan Processor Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the This/Event data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin8

Joiner Propagation of Stale Data

Rapid changes to a foreign Key (perhaps due capturing changes from a relational database undergoing such operations) can cause a race condition. The other/Entity foreign keyed table may be quite congested with subscription requests on one node, and lightly subscribed on another due to the nature of the key distribution. As such, there is no guarantee the order in which the subscription updates will arrive in the post-subscribe repartition topic. It is for this reason why we must compare the current state of the primary key event to ensure that the data is still consistent with the extracted foreign key.

...

with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed inner join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> join(final KTable<KO, VO> other,
                                    final Function<V, KO> foreignKeyExtractor,
                                    final ValueJoiner<V, VO, VR> joiner,
                                    final Named named,
                                    final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains only those records that satisfy the given predicate
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Named named);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V). If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

    /**
     * Join records of this {@code KTable} with another {@code KTable} using non-windowed left join.
     * <p>
     * This is a foreign key join, where the joining key is determined by the {@code foreignKeyExtractor}.
     *
     * @param other               the other {@code KTable} to be joined with this {@code KTable}. Keyed by KO.
     * @param foreignKeyExtractor a {@link Function} that extracts the key (KO) from this table's value (V) If the
     *                            result is null, the update is ignored as invalid.
     * @param joiner              a {@link ValueJoiner} that computes the join result for a pair of matching records
     * @param named               a {@link Named} config used to name the processor in the topology
     * @param materialized        a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable}
     *                            should be materialized. Cannot be {@code null}
     * @param <VR>                the value type of the result {@code KTable}
     * @param <KO>                the key type of the other {@code KTable}
     * @param <VO>                the value type of the other {@code KTable}
     * @return a {@code KTable} that contains the result of joining this table with {@code other}
     */
    <VR, KO, VO> KTable<K, VR> leftJoin(final KTable<KO, VO> other,
                                        final Function<V, KO> foreignKeyExtractor,
                                        final ValueJoiner<V, VO, VR> joiner,
                                        final Named named,
                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

Workflow

The overall process is outlined below.

Gliffy Diagram
macroId1697ad3c-3394-468e-af23-ed872cdd9138
displayNameUpdatedOverview
nameUpdatedOverview
pagePin3

Gliffy Diagram
nameJohnWorkflow
pagePin11

  • One optimization possibility is to materialize the data provided to the original node at Step 6. This would allow for changes to an event in This KTable which does not change the foreign key to shortcut sending an event to the Other node. The tradeoff is the need to maintain a materialized state store.


More Detailed Implementation Details

Prefix-Scanning using a CombinedKey

CombinedKey is a simple tuple wrapper that stores the primary key and the foreign key together. The CombinedKey serde requires the usage of both the primary key and foreign key serdes. The CombinedKey is serialized as follows:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}{primaryKeySerialized}

This can support up to MAXINT size of data in bytes per key, which currently far exceeds the realistic sizing of messages in Kafka.

When performing the prefix scan on the RocksDB instance, we simply drop the primary key component, such that the serialized combined key looks like:

Code Block
{4-byte foreignKeyLength}{foreignKeySerialized}

Other/Entity PrefixScan Processor Behaviour

  • Requires the specific serialized format detailed above
  • Requires a RocksDB instance storing all of the This/Event data

Gliffy Diagram
nameRangeScanCombinedKeyUsage
pagePin9

Joiner Propagation of Stale Data

Rapid changes to a foreign Key (perhaps due capturing changes from a relational database undergoing such operations) can cause a race condition. The other/Entity foreign keyed table may be quite congested with subscription requests on one node, and lightly subscribed on another due to the nature of the key distribution. As such, there is no guarantee the order in which the subscription updates will arrive in the post-subscribe repartition topic. It is for this reason why we must compare the current state of the primary key event to ensure that the data is still consistent with the extracted foreign key.

While it is possible for a stale event to be propagated (ie: matches the foreign key, but is stale), the up-to-date event will be propagated when it eventually arrives. Entity changes do not cause the same race conditions that event-changes do, and so are not of a concern.

Original Value Hash

A hash value is computed for each message propagated from This KTable. It is passed through the entire process and returns to the co-located KTable in the final step. Before joining, a lookup is done to validate that the hash value is identical. If the hash is not identical, this indicates that the event has since changed, even if the primary and foreign key remain the same. This indicates that the value should be discarded, lest a duplicate be printed.

Example:

Code Block
Start with two empty tables:
LHS is This KTable
RHS is Other KTable
1- RHS is updated to Y|bar
2- LHS is updated to A|Y,1
   -> sends Y|A+ subscription message to RHS
3- LHS is updated to A|Y,2
   -> sends Y|A- unsubscribe message to RHS
   -> sends Y|A+ subscription to RHS
4- RHS processes first Y|A+ message
   -> sends A|Y,bar back
5- LHS processes A|Y,bar and produces result record A|Y,2,bar.
6- RHS processes Y|A- unsubscribe message (update store only)
7- RHS processes second Y|A+ subscribe message
   -> sends A|Y,bar back
8- LHS processes A|Y,bar and produces result record A|Y,2,bar
Thus, the first result record, that should have been `A|Y,1,bar`, is now
`A|Y,2,bar`. Furthermore, we update result table from `A|Y,2,bar` to
`A|Y,2,bar`.
By
 using a hash function to compare in step 5, we can instead see that the
 message is stale and should be discarded, lest we produce double output
 (or more, in the case of a rapidly changing set of events).

Tombstones & Foreign Key Changes

In the event of a deletion on the LHS, a tombstone record is sent to the RHS. This will see the state deleted in the RHS state store, and the null will be propagated back to the LHS via the Post-Subscribe Repartition Topic.

Code Block
1) LHS sends (FK-Key, null) to RHS
2) RHS deletes Key from state store
3) RHS sends (Key, null) back to LHS
4) LHS validates Key is still null in LHS-state store, propagates tombstone downstream



Each event from the LHS has a set of instructions sent with it to the RHS. These instructions are used to help manage the resolution of state.

Changing LHS (Key, FK-1) to (Key, FK-2).

Code Block
1) LHS sends (CombinedKey(FK-1,Key), SubscriptionWrapper(value=null, instruction=DELETE_KEY_NO_PROPAGATE)) to RHS-1
2) LHS sends (CombinedKey(FK-2,Key), SubscriptionWrapper(value=NewValueHash, instruction=PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE)) to RHS-2
3) RHS-1 deletes CombinedKey(FK-1,Key) from state store, but does not propagate any other event.
4) RHS-2 updates the local Key-Value store with (CombinedKey(FK-2,Key), NewValueHash).
5) RHS-2 looks up FK-2 in the RHS-2 materialized state store:
   a) If a non-null result is obtained, RHS-2 propagates to LHS: (Key, SubscriptionResponseWrapper(NewValueHash, RHS-Result, propagateIfNull=false))
   b) If a null result is obtained, RHS-2 propagates to LHS: (Key, SubscriptionResponseWrapper(NewValueHash, null, propagateIfNull=true))
      - This is done to ensure that the old join results are wiped out, since that join result is now stale/incorrect.
6) LHS validates that the NewValueHash from the SubscriptionResponseWrapper matches the hash of the current key in the LHS table.
   a) If the hash doesn't match, discard the event and return.
   b) If the hash does match, proceed to next step.
7) LHS checks if RHS-result == null and propagateIfNull is true.
   a) If yes, then propagate out a (Key, null) and return
   b) If no, proceed to next step.
8) Reminder: RHS-result is not null, and NewValueHash is valid and current. LHS performs the join logic on (LHS-Value and RHS-Result), and outputs the resultant event (Key, joinResult(LHS-Value, RHS-Result))


The workflow of LHS-generated changes to outputs is shown below. Each step is cumulative with the previous step. LEFT and INNER join outputs are shown below.


LHS Event
(key, extracted fk)
To which
RHS-partition?
RHS-0 StateRHS-1 StateInner Join OutputLeft Join OutputExecute
Join Logic?
NotesInner-Join SubscriptionWrapper Instruction
Publish new event(k,1)RHS-0(1,foo)
(k,1,foo)(k,1,foo)Inner/LeftNormal fk-join induced by LHS event

to RHS-0:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE

Publish update to event
by changing fk
(k,1) → (k,2)RHS-1(1,foo)
(k,null)(k,2,null)LEFT

Must indicate a delete because there is currently no (fk,value) in RHS with key=2, and (k,1,foo) is no longer valid output.

to RHS-0: DELETE_KEY_NO_PROPAGATE

to RHS-1:
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE

Publish update to event
by changing fk

(k,2) → (k,3)RHS-0(1,foo)
(k,null)(k,3,null)LEFT
Ideally would not publish a delete with Inner Join, but we do not maintain sufficient state to know that the (k,2) update resulted in a null output and we don't need to do it again.

to RHS-0: DELETE_KEY_NO_PROPAGATE

to RHS-1:
PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE

Publish a value to RHS-0--

(1,foo)
(3,bar)


(k,3,bar)(k,3,bar)Inner/LeftPerforms prefix scan join-
Delete k(k,3) → (k,null)RHS-0

(1,foo)
(3,bar)


(k,null)(k,null,null)LEFTPropagate null/delete through the sub-topology

to RHS-0: DELETE_KEY_AND_PROPAGATE

Publish original event again(k,null) → (k,1)RHS-0

(1,foo)
(3,bar)


(k,1,foo)(k,1,foo)Inner/LeftNormal fk-join induced by LHS event

to RHS-0:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE

Publish to LHS(q,10)RHS-1

(1,foo)
(3,bar)


Nothing(q,null,10)LEFT
Significant difference between Inner and Outer-
Publish a value to RHS-1--

(1,foo)
(3,bar)

(q,baz)(q,10,baz)(q,10,baz)Inner/LeftNormal fk-join induced by LHS event

to RHS-1:
PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE











Compatibility, Deprecation, and Migration Plan

...

Gliffy Diagram
nameOutOfOrderResolution-RecordHeaders
pagePin7
version5


Rejected because it requires an additional materialized table, and does not provide any significant benefit:

Input Tables generated by:

Code Block
stream.flatMapValues().groupByKey().aggregate()

In this case, multiple KTable updates have the same input record and thus the same offset. Hence, there is no guarantee that offsets are unique and thus we cannot use them to resolve update conflicts.


Solution B - User-Managed GroupBy (Jan's)

...

Implementing a CombinedKeySerde depends on the specific framework with json. A full key would look like this "{  "a" :{ "key":"a1" }, "b": {"key":"b5" }   }" to generate a true prefix one had to generate "{ "a" :{ "key":"a1"", which is not valid json. This invalid Json will not leave the jvm but it might be more or less tricky to implement a serializer generating it. Maybe we could provide users with a utility method to make sure their serde statisfies our invariants.

 

 



Proposed Changes

Goal

With the two relations A,B and there is one A for each B and there may be many B's for each A. A is represented by the KTable the method described above gets invoked on, while B is represented by that methods first argument. We want to implement a Set of processors that allows a user to create a new KTable where A and B are joined based on the reference to A in B. A and B are represented as KTable B being partitioned by B's key and A being partitioned by A's key.

Algorithm


ascii_raw_KIP-213 


  • Call enable sendOldValues() on sources with "*"
  • Register a child of B
    • extract A's key and B's key as key and B as value.
      • forward(key, null) for old
      • forward(key, b) for new
      • skip old if A's key didn't change (ends up in same partition)
  • Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
    • in the sink, only use A's key to determine partition
  • Register source for intermediate topic
    • co-partition with A's sources
    • materialize
    • serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
  • Register processor after above source.
    • On event extract A's key from the key
    • look up A by it's key
    • perform the join (as usual)
  • Register processor after A's processor
    • On event uses A's key to perform a Range scan on B's materialization
    • For every row retrieved perform join as usual
  • Register merger
    • Forward join Results
    • On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
  • Merger wrapped into KTable and returned to the user.

...

        
TOPOLOGY INPUT ATOPOLOGY INPUT BSTATE A MATERIALZEDSTATE B MATERIALIZEINTERMEDIATE RECORDS PRODUCEDSTATE B OTHER TASKOutput A Source / Input Range ProccesorOUTPUT RANGE PROCESSOROUTPUT LOOKUP PROCESSOR
key: A0 value: [A0 ...] 
key: A0 value: [A0 ...]   


Change<null,[A0 ...]>

invoked but nothing found.

Nothing forwarded

 


key: A1 value: [A1 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

   




Change<null,[A1 ...]>invoked but nothing found. Nothing forwarded 

key: B0 : value [A2,B0 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]partition key: A2 key: A2B0 value: [A2,B0 ...]key: A2B0 : value [A2,B0 ...] 

invoked but nothing found

Nothing forwarded


key: B1 : value [A2,B1 ...]

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

partition key: A2 key: A2B1 value [A2,B1 ...]

key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

 



invoked but nothing found

Nothing forwarded

key: A2 value: [A2 ...] 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: A2 value: [A2 ...]

key: B0 : value [A2,B0 ...]

key: B1 : value [A2,B1 ...]

 


key: A2B0 : value [A2,B0 ...]

key: A2B1 : value [A2,B1 ...]

Change<null,[A2 ...]>

key A2B0 value: Change<null,join([A2 ...],[A2,B0 ...])

key A2B1 value: Change<null,join([A2 ...],[A2,B1...])

 



key: B1 : value null 
key: B0 : value [A2,B0 ...]partition key: A2 key: A2B1 value:nullkey: A2B0 : value [A2,B0 ...] 

key A2B1 value: Change<join([A2 ...],[A2,B1...],null)

key: B3 : value [A0,B3 ...] 

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

partition key: A0 key: A0B3 value:[A0,B3 ...]

key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

 



key A0B3 value: Change<join(null,[A0 ...],[A0,B3...])
key: A2 value: null 

key: A0 value: [A0 ...]

key: A1 value: [A1 ...]

key: B0 : value [A2,B0 ...]

key: B3 : value [A0,B3 ...]

 


key: A2B0 : value [A2,B0 ...]

key: A0B3 : value [A0,B3 ...]

Change<[A2 ...],null>key A2B0 value: Change<join([A2 ...],[A2,B0 ...],null) 

...



Range lookup

It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.

...