Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Current state: Under Discussion Merged

Voting thread: here

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12317
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14748

...

Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
Similarly, for outer-joins it makes sense to keep the record.
The defined semantics for left/outer join are: keep the stream record if no matching join record was found.
Thus, Kstreams Kafka Streams today's behavior is inconsistent with the defined semantics.

...

  • left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
  • left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
  • left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • left join KStream-GlobalTable: no longer drop left records with null-key when KeyValueMapper returns 'null' and call KeyValueMapper ValueJoiner with 'null' for left  key. The case where KeyValueMapper returns null is already handled in the current implementationright value.

Repartition of null-key records

...

//left join Kstream-Kstream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//outer join Kstream-Kstream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//left-foreign-key join Ktable-Ktable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (lv, rv) -> join(lv, rv), Named.as("left-foreign-key-table-join"));

//left join Kstream-Ktable
leftStream
.filter((key, value) -> key != null)
.leftJoin(ktable, (k, lv, rv) -> join(lv, rv));

//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (k, v) -> ...;
// The case where the KeyValueMapper returns null is already handled.
// What changes is that the KeyValueMapper might be called with null for the key 'k'.
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (lv, rv) -> join(lv, rv));

...


We will hint to the DSL users via Java docs that they have now have the option to distinguish between the following scenarios within a KTable-KTable foreign- key-left-join operator:
"null-key" v.s. "not-null-key but null-value" by passing a 'ValueJoinerWithKey' instead of 'ValueJoiner' to the left join operator.
The remark will be made in the Java docs of all left join methods with a 'ValueJoiner' as a parameter.


Furthermore, as As part of this KIP the above information on how to keep the old behavior will also be documented here: https://kafka.apache.org/documentation/streams/upgrade-guide
Plus, https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics and https://kafka.apache.org/35/documentation/streams/developer-guide/dsl-api.html#joining will be updated accordingly.

Why not make this change opt in?

...