Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"Accepted" [VOTE] KIP-77: Improve Kafka Streams Join Semantics

Discussion thread: [DISCUSS] KIP-76: Improve Kafka Streams Join Semantics (this should have been KIP-77 – there was some confusion about KIP numbering)

JIRA: KAFKA-4001

Released: planned for 0.10.12.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation 

Currently, Kafka Streams join semantics are not very intuitive for many users with regard to reasoning about expected results. Although stream-based join semantics (as used in Kafka Streams) cannot be completely consistent with join semantics in RDBMS SQL, we observed that  our current join semantics can still be improved to make them more intuitive to understand.

...

  •  Our current definitions of the join types have inconsistent choices across these three dimensions. For example, for KStream-KStream joins, input records with null values are used to trigger a key lookup (i.e., participate in the join) but are not added to the window state (i.e., do not participate in the join).
  • Although we understand why KStream-KTable does not offer outer join semantics (discussion at the end -- “Rejected alternatives”), there is no reason to not support KStream-KTable inner join.

Public Interfaces (Proposed Additions) 

Adding KStream-Table inner join would add the following methods to KStream:

  • KStream<K,V1>.join(KTable<K,V2> other, ValueJoiner<VR, V1, V2>  joiner)

  • KStream<K,V1>.join(KTable<K,V2> other, ValueJoiner<VR, V1, V2>  joiner, Serde<K> keySerde, Serde<V1> thisValueSerde, Serde<V2> otherValueSerde)

Proposed Changes

First, we describe the current join semantics for all join variants with regard to the underlying concepts:

...

Afterwards, we describe the suggested (new) semantics.

 

KStream-KStream (result KStream): 

This join is a symmetric window join. The basic semantics can be expressed via the following SQL statement:

...

 

ts

left

right

innerJoin

leftJoin

outerJoin

1

null

  

null - null

null - null

2

 

null

  

null - null

3

A

  

A - null

A - null

4

 

a

A - a

 

A - a

5

B

 

B - a

B - a

B - a

6

 

b

A - b

B - b

 

A - b

B - b

7

null

 

null - a

null - b

null - a

null - b

null - a

null - b

8

 

null

A - null

B - null

 

A - null

B - null

9

C

 

C - a

C - b

C - a

C - b

C - a

C - b

10

 

c

A - c

B - c

C - c

 

A - c

B - c

C - c

11

 

null

A - null

B - null

C - null

 

A - null

B - null

C - null

12

null

 

null - a

null - b

null - c

null - a

null - b

null - c

null - a

null - b

null - c

13

 

null

A - null

B - null

C - null

 

A - null

B - null

C - null

14

 

d

A - d

B - d

C - d

 

A - d

B - d

C - d

15

D

 

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

 

KStream-KTable (result KStream): 

This join is an asymmetric non-window join. The basic ideas is to do a KTable lookup for each  KStream record (the KTable lookup is done on the current KTable state).

...

 

ts

left

right

leftJoin

1

null

 

null - null

2

 

null

 

3

A

 

A - null

4

 

a

 

5

B

 

B - a

6

 

b

 

7

null

 

null - b

8

 

null

 

9

C

 

C - null

10

 

c

 

11

 

null

 

12

null

 

null - null

13

 

null

 

14

 

d

 

15

D

 

D - d

 

KTable-KTable (result KTable): 

This join is a symmetric non-window join. The basic ideas is to do a KTable lookup for each KTable update. he KTable lookup is done from the KTable that just received a new update on the current state of the other KTable. A main difference to the first two joins is that the result is a KTable: this result KTable reflects the current join result; ie, if an input KTable record gets deleted, we might need to delete a result record from the result KTable (via tombstone record) -- for this case, we do not call ValueJoiner but directly emit a tombstone record (ie, a record with null value), shown as null in the table below.

...

 

ts

left

right

innerJoin

leftJoin

outerJoin

1

null

 

null

null

null

2

 

null

null

null

null

3

A

 

null

A - null

A - null

4

 

a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 

b

B - b

B - b

B - b

7

null

 

null

null

null - b

8

 

null

null

null

null

9

C

 

null

C - null

C - null

10

 

c

C - c

C - c

C - c

11

 

null

null

C - null

C - null

12

null

 

null

null

null

13

 

null

null

null

null

14

 

d

null

null

null - d

15

D

 

D - d

D - d

D - d

 

Current issues:

  •  inconsistent triggering: for KStream-KStream left-join, only left input records do trigger
  •  inconsistent null handling (for KStream-KStream joins): we do trigger on <key:null> but do not add record into window state
  •  missing inner KStream-KTable join
  •  we send unnecessary tombstone messages for KTable-KTable joins

 

Suggested semantics:

  • KStream-KStream Join:
    • materialize left stream, and let any received record from right stream to trigger join function if a matching record on the left materialized store can be found as well.
    • for inner / left / outer join, do not trigger join if either / left / both of the matching records are null.
  • KStream-KTable Join:
    • ignore KStream <key:null> records (ie, do not trigger result computation) to be consistent with KStream-KStream <key:null> policy
    • no window state here (nothing changes)
    • keep <key:null> changelog semantics for KTable
  • KTable-KTable Join:
    • remove all unnecessary tombstone messages

In the following we illustrate the impact of our suggested changes for all joins using the examples from above by showing current and intended results. Red background color highlights changes for which result records get removed and green background color highlights changes for which result records get added. Cells with no background color indicate no change of result records.

 

KStream-KStream (result KStream): 

Inner join: remove all null-value triggers

...

 

ts

left

right

outerJoin (current)

outerJoin (suggested)

1

null

 

null - null

 

2

 

null

null - null

 

3

A

 

A - null

A - null

4

 

a

A - a

A - a

5

B

 

B - a

B - a

6

 

b

A - b

B - b

A - b

B - b

7

null

 

null - a

null - b

 

8

 

null

A - null

B - null

 

9

C

 

C - a

C - b

C - a

C - b

10

 

c

A - c

B - c

C - c

A - c

B - c

C - c

11

 

null

A - null

B - null

C - null

 

12

null

 

null - a

null - b

null - c

 

13

 

null

A - null

B - null

C - null

 

14

 

d

A - d

B - d

C - d

A - d

B - d

C - d

15

D

 

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

 

KStream-KTable (result KStream): 

Left join: remove left null-value triggers

...

 

ts

left

right

leftJoin (current)

leftJoin (suggested)

innerJoin (suggested to add)

1

null

 

null - null

  

2

 

null

   

3

A

 

A - null

A - null

 

4

 

a

   

5

B

 

B - a

B - a

B - a

6

 

b

   

7

null

 

null - b

  

8

 

null

   

9

C

 

C - null

C - null

 

10

 

c

   

11

 

null

   

12

null

 

null - null

  

13

 

null

   

14

 

d

   

15

D

 

D - d

D - d

D - d

 

KTable-KTable (result KTable): 

Inner join: send null iff this null-value and other lookup not null

...

 

ts

left

right

innerJoin

leftJoin

outerJoin

1

null

 

null

null

null

2

 

null

null

null

null

3

A

 

null

A - null

A - null

4

 

a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 

b

B - b

B - b

B - b

7

null

 

null

null

null - b

8

 

null

null

null

null

9

C

 

null

C - null

C - null

10

 

c

C - c

C - c

C - c

11

 

null

null

C - null

C - null

12

null

 

null

null

null

13

 

null

null

null

null

14

 

d

null

null

null - d

15

D

 

D - d

D - d

D - d

 

Summary

With the suggested changes, we fix the following currently existing inconsistencies: 

...

  •  Add missing join type (ie, KStream-KTable inner join)

  • Remove unnecessary tombstone records in result of KTable-KTable joins

Compatibility, Deprecation, and Migration Plan

This KIP introduces a semantic change (even if the API is the same -- with exception of newly added methods) and thus is not backward compatible.
(With the exception of the changes to KTable-KTable joins -- those changes are only an internal optimization and fully backward compatible.)

Test Plan

The new join semantics can be unit tested (using the examples of this KIP which are exhaustive).

Rejected Alternatives

  1. Handle <key:null> records for KStreams input as regular <key:value> records, rejected for the following reasons:

    •  in contrast to relational model, <key:null> records do have special semantics (there is actually nothing to be joined); this also relates to KTable tombstone (ie, delete) semantics of <key:null> records 
    • for inner joins, user would not expect that ValueJoiner is called with one parameter being null
    • for left/outer join, user cannot distinguish if the call to ValueJoiner is done because no key was found or because <key:null> record was found
    • alternatively, we would need to introduce different ValueJoiner classes with different method, ie, #join(left, right), #joinLeft(left), #joinRight(right), but we want to keep API simple
      •  InnerValueJoin only offering #join()   
      • LeftValueJoiner offering #join() and #leftJoin()
      • OuterValueJoiner offering all three methods
  2. add outer KStream-KTable join, rejected for the following reasons:

    • we can only trigger lookup for KStream records

    • thus, outer KStream-KTable join is essentially same as left KStream-KTable join
    • if we want outer KStream-KTable join, we must use a windowed KStream to get a state for lookups, thus introducing a completely new join operator (which is beyond of the KIP)