Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

DRAFT

Current state: Under Discussion

...

Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
Similarily, for outer-joins it makes sense to keep the record for the non null-key side.
The defined semantics for left/outer join are: keep the stream record if no matching join record was found.
Thus, Kstreams todays behavior is incosistent with the defined semantics.

...

  • left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
  • left join KStream-Ktable/GlobalTable: emits record even when right key is not found / null.left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for left value.
  • left join KStream-Ktable/GlobalTable: emits record even when left key is null.

Compatibility, Deprecation, and Migration Plan

Users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.

E.g.


//current
leftStream.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//new if old behavior is needed
rightStream = rightStream.filter((key, value) -> key != null && value != null);
leftStream.filter((key, value) -> key != null && value != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//Both key and value needs to be null checked as current defined behavior for stream-stream join is:
...
*
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
*
...

Why not make this change opt in?
By default we would like the DSL be consistent with the defined semantics.

Part of this KIP is also to make a Section in the upgrade guide to further elaborate on how the old behavior can be achieved if needed.