Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

 


Kafka Streams offers a variety of different join operators with three different types:

...

Kafka Streams offers the follow join operators (operators in bold font were added in current trunk, compared to 0.10.1.x and older):

...


inner joinleft joinouter join
KStream-KStreamyesyesyes
KStream-KTableyesyesno
KTable-KTableyesyesyes

KStream-KStream Join

This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for all three join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

innerJoin

leftJoin

outerJoin

1

null

  

  




2

 

null

   



3

A

  



A - null

A - null

4

 


a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 


b

A - b

B - b

A - b

B - b

A - b

B - b

7

null

 

 

 

 




8

 


null

 

  




9

C

 


C - a

C - b

C - a

C - b

C - a

C - b

10

 

c

A - c

B - c

C - c

A - c

B - c

C - c

A - c

B - c

C - c

11

 


null

 

  




12

null

 

 

 

 




13

 

null

 

  




14

 


d

A - d

B - d

C - d

A - d

B - d

C - d

A - d

B - d

C - d

15

D

 

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

KStream-KTable Join

This is an asymmetric non-window join. The basic semantics is a KTable lookup for each KStream record (while each KTable input record updates the current KTable view but does never yield any result record). The result is a KStream. Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort). The table below shows the output (for each processed input record) for both offered join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

leftJoininnerJoin

1

null

   




2

 

null

  



3

A

 


A - null
 

4

 

a

 

 


5

B

 

B - aB - a

6

 

b

 

 


7

null

 



8

 

null

 


9

8

 

null

  

9

C

 

C


C
C
- null
 

10

 


c

  



11

 


null

 

 


12

null

   




13

 


null

  


14

 

d

  


15

D

 


D - dD - d

KTable-KTable Join

This is a symmetric non-window join. The basic semantics is a KTable lookup in the "other" stream for each KTable update. The result is a (continuously updating) KTable (ie, a changelog stream that can contain tombstone message with format <key:null>; those tombstone are shown as null in the result in contrast to results "X - null" indicating a valid join result with only one join partner). Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort).

Warning
titleKTable Cache

If you want to observe the below described behavior, you will most likely need to disable KTable deduplication cache, by setting cache.max.bytes.buffering=0 in StreamsConfig. Otherwise, the deduplication cache will "swallow" many of the produced result records and it will be hard to reason about the actual join behavior.


ts

left

right

innerJoin

leftJoin

outerJoin

1

null

 




2

 

null

 




3

 

2

 

null

 

 

 

3

A

 

 

A



A

A

- null

A - null

4

 


a

A - a

A - a

A - a

5

B

 


B - a

B - a

B - a

6

 


b

B - b

B - b

B - b

7

null

 

null

null

null - b

8

 


null

 

 



null

9

C

 

 



C - null

C - null

10

 


c

C - c

C - c

C - c

11

 

null

null

C - null

C - null

12

null

 

 



null

null

13

 


null

 

 

 




14

 

d



null - d

 

15

 

D

null

D - d

15

D - d

D

- d


16




17
d
 
D - dD - dD - d
1617dD - dD - dD - d

KTable-KTable Foreign-Key Join

This is a symmetric non-window join. There are two streams involved in this join, the left stream and the right stream, each of which are usually keyed on different key types. The left stream is keyed on the primary key, whereas the right stream is keyed on the foreign key. Each element in the left stream has a foreign-key extractor function applied to it, which extracts the foreign key. The resultant left-event is then joined with the right-event keyed on the corresponding foreign-key. Updates made to the right-event will also trigger joins with the left-events containing that foreign-key. It can be helpful to think of the left-hand materialized stream as events containing a foreign key, and the right-hand materialized stream as entities keyed on the foreign key.

KTable lookups are done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort).


The workflow of LHS-generated changes to outputs is shown below. Each step is cumulative with the previous step. Only LEFT and INNER joins are supported, and their outputs are shown below.

ts
LHS-Stream
(K, extracted-FK)
RHS-Stream State (FK,V)Inner Join OutputLeft Join OutputExecute
Join Logic?
Notes
1Publish new event(k,1)(1,foo)(k,1,foo)(k,1,foo)Inner/LeftNormal fk-join induced by LHS event
2Publish update to event
by changing fk
(k,1) → (k,2)(1,foo)(k,null)(k,2,null)LEFT

Must indicate a delete because there is currently no (fk,value) in RHS with key=2, and (k,1,foo) is no longer valid output.

3

Publish update to event
by changing fk

(k,2) → (k,3)(1,foo)(k,null)(k,3,null)LEFT
Ideally would not publish a delete with Inner Join, but we do not maintain sufficient state to know that the (k,2) update resulted in a null output and we don't need to do it again.
4Publish a value to RHS-0-

(1,foo)
(3,bar)

(k,3,bar)(k,3,bar)Inner/LeftPerforms prefix scan join
5Delete k(k,3) → (k,null)

(1,foo)
(3,bar)

(k,null)(k,null,null)LEFTPropagate null/delete through the sub-topology
6Publish original event again(k,null) → (k,1)

(1,foo)
(3,bar)

(k,1,foo)(k,1,foo)Inner/LeftNormal fk-join induced by LHS event
7Publish to LHS(q,10)

(1,foo)
(3,bar)

Nothing(q,null,10)LEFT
Significant difference between Inner and Outer
8Publish a value to RHS-1-

(1,foo)
(3,bar)
(q,baz)

(q,10,baz)(q,10,baz)Inner/LeftNormal fk-join induced by LHS event









Anchor
oldJoinSemantics
oldJoinSemantics

...

Kafka Streams 0.10.1.x (and older) offers the follow join operators:

 

inner joinleft joinouter join
KStream-KStreamyesyesyes
KStream-KTablenoyesno
KTable-KTableyesyesyes

KStream-KStream Join

This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for all three join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

innerJoin

leftJoin

outerJoin

1

null

  


null - null

null - null

2

 

null

  



null - null

3

A

 

 

A



A - null

A - null

4

 


a

A - a

 


A - a

5

B

 


B - a

B - a

B - a

6

 

b

A - b

B - b

 


A - b

B - b

7

null

 

null - a

null - b

null - a

null - b

null - a

null - b

8

 

null

A - null

B - null

 


A - null

B - null

9

C

 

C - a

C - b

C - a

C - b

C - a

C - b

10

 


c

A - c

B - c

C - c

 


A - c

B - c

C - c

11

 


null

A - null

B - null

C - null

 

A - null

B - null

C - null

12

null

 


null - a

null - b

null - c

null - a

null - b

null - c

null - a

null - b

null - c

13

 


null

A - null

B - null

C - null

 


A - null

B - null

C - null

14

 


d

A - d

B - d

C - d

 

A - d

B - d

C - d

15

D

 


D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

D - a

D - b

D - c

D - d

KStream-KTable Join

This is an asymmetric non-window join. The basic semantics is a KTable lookup for each KStream record. The result is a KStream. Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort). The table below shows the output (for each processed input record) for both offered join variants. Pay attention, that some input records do not produce output records.

ts

STREAM_1 (left)

STREAM_2 (right)

leftJoin

1

null

 

 null - null

2

 

null

 

3

A

 


A - null

4

 

a

 


5

B

 


B - a

6

 

b

 


7

null

 


 null - b

8

 


null

 


9

C

 


C - null

10

 

c

 


11

 

null

 

12

null

 


 null - null

13

 

null

 


14

 


d

 


15

D

 

D - d

KTable-KTable Join

This is a symmetric non-window join. The basic semantics is a KTable lookup in the "other" stream for each KTable update. The result is a (continuously updating) KTable (ie, a changelog stream that can contain tombstone message with format <key:null>; those tombstone are shown as null in the result in contrast to results "X - null" indicating a valid join result with only one join partner). Pay attention, that the KTable lookup is done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in practice Kafka Streams does not guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort).

Warning
titleKTable Cache

If you want to observe the below described behavior, you will most likely need to disable KTable deduplication cache (for Kafka 0.10.1.x), by setting cache.max.bytes.buffering=0 in StreamsConfig. Otherwise, the deduplication cache will "swallow" many of the produced result records and it will be hard to reason about the actual join behavior.


ts

STREAM_1 (left)

STREAM_2 (right)

innerJoin

leftJoin

outerJoin

1

null

 


null

null

null

2

 


null

null

null

null

3

A

 


null

A - null

A - null

4

 


a

A - a

A - a

A - a

5

B

 

B - a

B - a

B - a

6

 


b

B - b

B - b

B - b

7

null

 

null

null

null - b

8

 

null

null

null

null

9

C

 

null

C - null

C - null

10

 


c

C - c

C - c

C - c

11

 

null

null

C - null

C - null

12

null

 


null

null

null

13

 


null

null

null

null

14

 

d

null

null

null - d

15

D

 


D - d

D - d

D - d

16




17
dD - dD - dD - d