Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Current state: Under Discussion Merged

Voting thread: here

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12317
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14748

...

Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
Similarly, for outer-joins it makes sense to keep the record.
The defined semantics for left/outer join are: keep the stream record if no matching join record was found.
Thus, Kstreams Kafka Streams today's behavior is inconsistent with the defined semantics.

...

  • left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
  • left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
  • left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.

Repartition of null-key records

...

//left join Kstream-Kstream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//outer join Kstream-Kstream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//left-foreign-key join Ktable-Ktable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (lv, rv) -> join(lv, rv), Named.as("left-foreign-key-table-join"));

//left join Kstream-Ktable
leftStream
.filter((key, value) -> key != null)
.leftJoin(ktable, (k, lv, rv) -> join(lv, rv));

//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (k, v) -> ...;
// The case where the KeyValueMapper returns null is already handled.
// What changes is that the KeyValueMapper might be called with null for the key 'k'.
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (lv, rv) -> join(lv, rv));

...


We will hint to the DSL users via Java docs that they have now have the option to distinguish between the following scenarios within a KTable-KTable foreign- key-left-join operator:
"null-key" v.s. "not-null-key but null-value" by passing a 'ValueJoinerWithKey' instead of 'ValueJoiner' to the left join operator.
The remark will be made in the Java docs of all left join methods with a 'ValueJoiner' as a parameter.

...