Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Compatibility, Deprecation, and Migration Plan

Users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.

E.g.

//when the old behavior is needed
leftStreamleft join Kstream-Kstream
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//outer join Kstream-Kstream
rightStream
.filter((key, value) -> key != null);
leftStream
.filter((key, value) -> key != null)
.outerJoin(rightStream, (lv, rv) -> join(lv, rv), windows);

//left-foreign-key join Ktable-Ktable
Function<String, String> foreignKeyExtractor = leftValue -> ...
rightTable
.filter((key, value) -> foreignKeyExtractor.apply(value) != null)
.leftJoin(leftTable, foreignKeyExtractor, (lv, rv) -> join(lv, rv), Named.as("left-foreign-key-table-join"));

//left join KStream-GlobalTable
KeyValueMapper<String, String, String> keyValueMapper = (k, v) -> ...;
// The case where the KeyValueMapper returns null is already handled.
// What changes is that the KeyValueMapper can be called with null for the key 'k'.
leftStream
.filter((key, value) -> key != null)
.leftJoin(globalTable, keyValueMapper, (lv, rv) -> join(lv, rv));

//left join KStream-Ktable
leftStream
.filter((key, value) -> key != null)
.leftJoin(rightStreamktable, (k, lv, rv) -> join(lv, rv), windows);


Why not make this change opt in?

By default we would like the behavior of a stream defined via DSL to be consistent with the defined semantics.

...