Table of Contents |
---|
Current state: Under Discussion Merged
Voting thread: here
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
...
Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
Similarly, for outer-joins it makes sense to keep the record.
The defined semantics for left/outer join are: keep the stream record if no matching join record was found.
Thus, Kstreams Kafka Streams today's behavior is inconsistent with the defined semantics.
...
- left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
- outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
- left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
- left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
- left join KStream-GlobalTable: no longer drop records when KeyValueMapper returns 'null' and call ValueJoiner with 'null' for right value.
Repartition of null-key records
...
//left join Kstream-Kstream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);
//outer join Kstream-Kstream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (lv, rv) -> join(lv, rv), windows);
//left-foreign-key join Ktable-Ktable
Function<String, String> foreignKeyExtractor = leftValue -> ...
leftTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(rightTable, foreignKeyExtractor, (lv, rv) -> join(lv, rv), Named.as("left-foreign-key-table-join"));
//left join Kstream-Ktable
leftStream
.filter((key, value) -> key != null)
.leftJoin(ktable, (k, lv, rv) -> join(lv, rv));
//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (k, v) -> ...;
// The case where the KeyValueMapper returns null is already handled.
// What changes is that the KeyValueMapper might be called with null for the key 'k'.
leftStream
.filter((key, value) -> keyValueMapper.apply(key,value) != null)
.leftJoin(globalTable, keyValueMapper, (lv, rv) -> join(lv, rv));
...
We will hint to the DSL users via Java docs that they have now have the option to distinguish between the following scenarios within a KTable-KTable foreign- key-left-join operator:
"null-key" v.s. "not-null-key but null-value" by passing a 'ValueJoinerWithKey' instead of 'ValueJoiner' to the left join operator.
The remark will be made in the Java docs of all left join methods with a 'ValueJoiner' as a parameter.
...