Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12317
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-14748

Motivation

Kafka Streams enforces a strict non-null-key policy in the DSL across all key-dependent operations (like aggregations and joins).
For left-joins, it makes sense to still accept a `null`, and add the left-hand record with an empty right-hand-side to the result.
SimilarilySimilarly, for outer-joins it makes sense to keep the record.
The defined semantics for  left/outer  join are: keep the stream record if no matching join record was found.
Thus, Kstreams todays today's behavior is incosistent inconsistent with the defined semantics.

Public Interfaces

This KIP won't change any public interfaces. However, It will change the behavior of the following operators:

  • left KStream-Ktable/GlobalTable
  • left/outer

...

  • Kstream-Kstream
  • left-foreign-key Ktable-Ktable - allow foreign key to be null.

Proposed Changes

  • left join  Kstream-Kstream: no longer drop left records with null-key and call ValueJoiner with 'null' for right value.
  • outer join  Kstream-Kstream: no longer drop left/right records with null-key and call ValueJoiner with 'null' for right/left value.
  • left-foreign-key join Ktable-Ktable: no longer drop left records with null-foreign-key returned by the ForeignKeyExtractor and call ValueJoiner with 'null' for right value.
  • left join KStream-Ktable/GlobalTable: emits record even when left key is nullno longer drop left records with null-key and call ValueJoiner with 'null' for right value.

Compatibility, Deprecation, and Migration Plan

Users who want to keep the current behavior can prepend a .filter() operator to the aforementioned operators and filter accordingly.

E.g.

//left

...

join

...

for

...

when

...

old

...

behavior

...

is

...

needed

leftStream.filter((key,

...

value)

...

->

...

key

...

!=

...

null)

...

         .leftJoin(rightStream,

...

(lv,

...

rv)

...

->

...

join(lv,

...

rv),

...

windows);

...


Why

...

not

...

make

...

this

...

change

...

opt

...

in?

By

...

default

...

we

...

would

...

like

...

the

...

behavior

...

of a

...

stream

...

defined

...

via

...

DSL

...

to

...

be

...

consistent

...

with

...

the

...

defined

...

semantics.

Part of this KIP is also to make a Section in the upgrade guide to further elaborate on how the old behavior can be achieved if needed.