You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

We want to close the gap between the semantics of KTables in streams and tables in relational databases. It is common practice to capture changes as they are made to tables in some RDMS into kafka topics (JDBC-connect, debezium, Maxwell). It is also common practice to use some sort of normalization inside the RDBMS so that entities end up in a one-to-many relationship. Usually RDBMSs offer good support to resolve this relationship with a join. Streams falls short here and the workaround (group by - join - lateral view) is not well supported as well and is not in line with the idea of record based processing.

 

Public Interfaces

Less intrusive

We would introduce a new Method into KTable and KTableImpl

KTable.java
   /**
     * 
     * Joins one record of this KTable to n records of the other KTable
     * an event in this KTabl will produce n consecutive events an update
     * in other table will update only the 1 matched record
     * 
     * @param the table containing n records for each K of this table
     * @param keyExtractor a {@code ValueMapper} returning the key of this table from the others value
     * @param joinPrefixFaker a {@code ValueMapper} returning an outputkey that when serialized only produces the
     *                             prefix of the output key wich is the same as serializing K 
     * @param leftKeyExtractor a {@code ValueMapper} extracting the Key of this table from the resulting Key 
     * @param <K0> the resultings tables Key
     * @param <V0> the resultings tables Value
     * @param joiner
     * @return
     */
    <K0, V0, KO, VO> KTable<K0, V0> oneToManyJoin(KTable<KO, VO> other,
            ValueMapper<VO, K> keyExtractor,
            ValueMapper<K, K0> joinPrefixFaker,
            ValueMapper<K0, K> leftKeyExtractor, 
            ValueJoiner<V, VO, V0> joiner,
            Serde<KO> keyOtherSerde, Serde<VO> valueOtherSerde,
            Serde<K0> joinKeySerde, Serde<V0> joinValueSerde);

More intrusive

We would Introduce a new Complex type: Combined Key

package org.apache.kafka.streams;

public class CombinedKey<P,S> {
    public P prefix;
    public S suffix;
}

The method could be rewritten

KTable.java
   /**
     * 
     * Joins one record of this KTable to n records of the other KTable
     * an event in this KTabl will produce n consecutive events an update
     * in other table will update only the 1 matched record
     * 
     * @param the table containing n records for each K of this table
     * @param keyExtractor a {@code ValueMapper} returning the key of this table from the others value
     * @param leftKeyExtractor a {@code ValueMapper} extracting the Key of this table from the resulting Key 
     * @param <V0> the resultings tables Value
     * @param joiner
     * @return
     */
    <CombinedKey<K,KO>, V0, KO, VO> KTable <V0> oneToManyJoin(KTable<KO, VO> other,
            ValueMapper<VO, K> keyExtractor, 
            ValueJoiner<V, VO, V0> joiner,
            Serde<KO> keyOtherSerde, Serde<VO> valueOtherSerde,
            Serde<K0> joinKeySerde, Serde<V0> joinValueSerde);

Tradeoffs

The more intrusive version gives the user better clarity that his resulting KTable is not only keyed by B but its also keyed by A. So he will be less surprised that in a theoretical later aggregation he might find the same B-key twice with different A-keys joined to them. On the other hand the less intrusive method doesn't need to introduce this wrapper class but let the user handle the need of having A' and B's key present in the output key himself. This might lead to a deeper understanding for the user and serdes might be able to pack the data denser.

Proposed Changes

Goal

With the two relations A,B and there is one A for each B and there may be many B's for each A. We want to implement a Set of processors that allows a user to create a new KTable where A and B are joined based on the reference to A in B. A and B are represented as KTable B being partitioned by B's key and A being partitioned by A's key.

Algorithm

  • Materialize B first
  • Register a child of B
    • extract A's key and B's key as key and B as value.
      • forward(key, null) for old
      • forward(key, b) for new
      • skip old if A's key didn't change (ends up in same partition)
  • Register sink for internal repartition topic (number of partitions equal to A, if a is internal prefer B over A for deciding number of partitions)
    • in the sink, only use A's key to determine partition
  • Register source for intermediate topic
    • co-partition with A's sources
    • materialize
    • serde for rocks needs to serialize A before B. ideally we use the same serde also for the topic
  • Register processor after above source.
    • On event extract A's key from the key
    • look up A by it's key
    • perform the join (as usual)
  • Register processor after A's processor
    • On event uses A's key to perform a Range scan on B's materialization
    • For every row retrieved perform join as usual
  • Register merger
    • Forward join Results
    • On lookup use full key to lookup B and extract A's key from the key and lookup A. Then perform join.
  • Merger wrapped into KTable and returned to the user.

Range lookup

It is pretty straight forward to completely flush all changes that happened before the range lookup into rocksb and let it handle a the range scan. Merging rocksdb's result iterator with current in-heap caches might be not in scope of this initial KIP. Currently we at trivago can not identify the rocksDb flushes to be a performance problem. Usually the amount of emitted records is the harder problem to deal with in the first place.

Missing reference to A

B records with a 'null' A-key value would be silently dropped

Compatibility, Deprecation, and Migration Plan

  • There is no impact to existing users.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.C

  • No labels