You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 24 Next »

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error. Unable to render Jira issues macro, execution error.

Motivation

Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
Similarly, for outer-joins it makes sense to keep the record.
The defined semantics for left/outer join are: keep the stream record if no matching join record was found.
Thus, Kstreams today's behavior is inconsistent with the defined semantics.

Public Interfaces

This KIP won't change any public interfaces. However, It will change the behavior of the following operators:

  • left join Kstream-Kstream
  • outer join Kstream-Kstream
  • left-foreign-key join Ktable-Ktable
  • left  join KStream-Ktable/GlobalTable

Proposed Changes

  • left join Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • outer join Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
  • left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
  • left join KStream-Ktable: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • left join KStream-GlobalTable: no longer drop left records with null-key and call KeyValueMapper with 'null' for left  key. The case where KeyValueMapper returns null is already handled in the current implementation.

Compatibility, Deprecation, and Migration Plan

Users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.

E.g.

//left join Kstream-Kstream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//outer join Kstream-Kstream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//left-foreign-key join Ktable-Ktable
Function<String, String> foreignKeyExtractor = leftValue -> ...
rightTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(leftTable, foreignKeyExtractor, (lv, rv) -> join(lv, rv), Named.as("left-foreign-key-table-join"));

//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (k, v) -> ...;
// The case where the KeyValueMapper returns null is already handled.
// What changes is that the KeyValueMapper might be called with null for the key 'k'.
leftStream
.filter((key, value) -> key != null)
.leftJoin(globalTable, keyValueMapper, (lv, rv) -> join(lv, rv));

//left join KStream-Ktable
leftStream
.filter((key, value) -> key != null)
.leftJoin(ktable, (k, lv, rv) -> join(lv, rv));


Why not make this change opt in?

By default we would like the behavior of a stream defined via DSL to be consistent with the defined semantics.



  • No labels