...
Pay attention, that both streams are use as examples for KStream (ie, record stream) and KTable (ie, changelog stream) with different semantics. For KTable, so-called tombstone records with format key:null
are of special interest, as they delete a key (those records are shown as null in all examples to highlight tombstone semantics). Last but not least, in Kafka Streams each join is "customized" by the user with a ValueJoiner
function that compute the actual result. Hence, we show output records as "X - Y" with X and Y being the left and right value, respectively, given to the value joiner. If the output is shown as null (ie, tombstone message), ValueJoiner
will not be called because a result record will be deleted.
...
Improved Left/Outer Stream-Stream Join (v3.
...
1.x
...
and newer)
Anchor | ||||
---|---|---|---|---|
|
Warning |
---|
Prior to version |
(See
Jira | ||||||
---|---|---|---|---|---|---|
|
...
This is a sliding window join, ie, all tuples that are "close" to each other with regard to time (ie, time difference up to window size) are joined. The result is a KStream. The table below shows the output (for each processed input record) for left and outer join only (as inner joins are not subject to spurious join results). Pay attention, that some input records do not produce output records, and that left/outer output record are emitted with some "delay" (ie, only emitted after grace-period passed). Also note, that the new behavior effectively requires to set a gracePeriod in the window definition to specify when left/outer join result should be emitted via ofTimeDifferenceNoGrace()
or ofTimeDifferenceWithGrace(...)
(setting the grace period using the old and now deprecated API, JoinWindows.of(...).grace(...)
., will not result in this new behavior, but will produce the same result as in older releases, 0.10.2.x
to 3.0.x)
.
In contrast to the later examples, we assume a window size of 15In contrast to the later examples, we assume a window size of 10, and a grace period of 5.
...
ts | STREAM_1 (left) | STREAM_2 (right) | innerJoin (same as in older versions) | leftJoin | outerJoin |
1 | null | ||||
2 | null | ||||
3 | A | ||||
4 | a | A - a | A - a | A - a | |
5 | B | B - a | B - a | B - a | |
6 | b | A - b B - b | A - b B - b | A - b B - b | |
7 | null | ||||
8 | null | ||||
9 | C | C - a C - |
b | C - a C - b | C - a C - b | |
10 | c | A - c B - c |
C - |
10
c | A - c B - c C - c | A - c B - c C - c | |||
11 | null | ||||
12 | null | ||||
13 | null | ||||
14 | d | A - d B - d C - d | A - d B - d C - d | A - d B - d C - d | |
15 | D | D - a D - b D - c D - d | D - a D - b D - c D - d | D - a D - b D - c D - d | |
... |
40 | E | ||||
... |
60 | F | E - null | E - null | ||
... |
80 | f | F - null | F - null | ||
... |
100 | G | null - f |
...
New Join Semantics (v0.10.2.x
and newer)
Anchor | ||||
---|---|---|---|---|
|
Warning |
---|
This section describes the new join semantics as of version |
(See KIP-77: Improve Kafka Streams Join Semantics)
...
The table below marks so called "spurious" left/outer join results, that are in the result in version 0.10.2.x
to 23.80.x
, in bold face. Compare Improved left/outer stream-stream join above for version 3.01.x
that avoids spurious results.
...
2.1.x
and newer: improvements in processing order and introducingmax.task.idle.ms
config to allow for partial blocking if one input is empty (cf. KIP-353: Improve Kafka Streams Timestamp Synchronization)23.80.x
and newer: stronger synchronization guarantees to avoid race conditions due to unpredictable consumer fetch behavior (cf. KIP-695: Further Improve Kafka Streams Timestamp Synchronization)
...
2.1.x
and newer: improvements in processing order and introducingmax.task.idle.ms
config to allow for partial blocking if one input is empty (cf. KIP-353: Improve Kafka Streams Timestamp Synchronization)23.80.x
and newer: stronger synchronization guarantees to avoid race conditions due to unpredictable consumer fetch behavior (cf. KIP-695: Further Improve Kafka Streams Timestamp Synchronization)
...
ts | left | right | innerJoin | leftJoin | outerJoin |
1 | null | ||||
2 | null | ||||
3 | A | A - null | A - null | ||
4 | a | A - a | A - a | A - a | |
5 | B | B - a | B - a | B - a | |
6 | b | B - b | B - b | B - b | |
7 | null | null | null | null - b | |
8 | null | null | |||
9 | C | C - null | C - null | ||
10 | c | C - c | C - c | C - c | |
11 | null | null | C - null | C - null | |
12 | null | null | null | ||
13 | null | ||||
14 | d | null - d | |||
15 | D | D - d | D - d | D - d | |
16 | |||||
17 | d | D - d | D - d | D - d |
KTable-KTable Foreign-Key Join (v2.4.x
and newer)
This is a symmetric non-window join. There are two streams involved in this join, the left stream and the right stream, each of which are usually keyed on different key types. The left stream is keyed on the primary key, whereas the right stream is keyed on the foreign key. Each element in the left stream has a foreign-key extractor function applied to it, which extracts the foreign key. The resultant left-event is then joined with the right-event keyed on the corresponding foreign-key. Updates made to the right-event will also trigger joins with the left-events containing that foreign-key. It can be helpful to think of the left-hand materialized stream as events containing a foreign key, and the right-hand materialized stream as entities keyed on the foreign key.
KTable lookups are done on the current KTable state, and thus, out-of-order records can yield non-deterministic result. Furthermore, in older versions of Kafka Streams there is no guarantee that all records will be processed in timestamp order (even if processing records in timestamp order is the goal, it is only best effort).
...
2.1.x
and newer: improvements in processing order and introducingmax.task.idle.ms
config to allow for partial blocking if one input is empty (cf. KIP-353: Improve Kafka Streams Timestamp Synchronization)23.80.x
and newer: stronger synchronization guarantees to avoid race conditions due to unpredictable consumer fetch behavior (cf. KIP-695: Further Improve Kafka Streams Timestamp Synchronization)
...
ts | LHS-Stream (K, extracted-FK) | RHS-Stream State (FK,V) | Inner-Join Output | Left-Join Output | |
---|---|---|---|---|---|
1 | Publish event to LHS | (k,1) | (1,foo) | (k,1,foo) | (k,1,foo) |
2 | Change LHS fk | (k,2) | (1,foo) | (k,null) | (k,2,null) |
3 | Change LHS fk | (k,3) | (1,foo) | (k,null) | (k,3,null) |
4 | Publish RHS entity | - | (1,foo) | (k,3,bar) | (k,3,bar) |
5 | Delete k | (k,null) | (1,foo) | (k,null) | (k,null,null) |
6 | Publish original event again | (k,1) | (1,foo) | (k,1,foo) | (k,1,foo) |
7 | Publish event to LHS | (q,10) | (1,foo) | - | (q,null,10) |
8 | Publish RHS entity | - | (1,foo) | (q,10,baz) | (q,10,baz) |
...
...
Old Join Semantics (v0.10.0.x
and v0.10.1.x
...
)
Anchor | ||||
---|---|---|---|---|
|
Kafka Streams 0.10.0.x and 0.10.1.x (and older) offers the follow join operators:
...