Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Voting

Discussion thread: here

Discussion Voting thread: here

JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10369

...

'Distinct' operation is common in data processing, e. g.

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an example of how distinct can be emulated, but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, similar to windowed joins and aggregations for KStreams.

...

  • distinct() parameterless DSLOperation on

    • TimeWindowedKStream<K,V>  DSLObject which returns KStream<Windowed<K>,V> 
    • SessionWindowedKStream<K,V> DSLObject which returns KStream<Windowed<K>,V>

The following methods are added to the corresponding interfaces:

Code Block
languagejava
firstline1
KTable<Windowed<K>, V> distinct(final Named named);
KTable<Windowed<K>, V> distinct(final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> distinct(final Named named,
                                    final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);


The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window.

The records are considered to be duplicates iff serialized forms of their keys are equal.


Usage Examples

Consider the following example (record times are in seconds):

...