Table of Contents |
---|
DRAFT
Current state: Under Discussion
...
Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
Similarily, for outer-joins it makes sense to keep the record for the non null-key side.
The defined semantics for left/outer join are: keep the stream record if no matching join record was found.
Thus, Kstreams todays behavior is incosistent with the defined semantics.
...
- left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
- outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
- left join KStream-Ktable/GlobalTable: emits record even when right key is not found / null.left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for left value.
- left join KStream-Ktable/GlobalTable: emits record even when left key is null.
Compatibility, Deprecation, and Migration Plan
Users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.
E.g.
//current
leftStream.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);
//new if old behavior is needed
rightStream = rightStream.filter((key, value) -> key != null && value != null);
leftStream.filter((key, value) -> key != null && value != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);
//Both key and value needs to be null checked as current defined behavior for stream-stream join is:
...
*
* If an input record key or value is {@code null} the record will not be included in the join operation and thus no
* output record will be added to the resulting {@code KStream}.
*
...
Why not make this change opt in?
By default we would like the DSL be consistent with the defined semantics.
Part of this KIP is also to make a Section in the upgrade guide to further elaborate on how the old behavior can be achieved if needed.