You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Message deduplication is a common task.

One example: we might have multiple data sources each reporting its state periodically with a relatively high frequency, their current states should be stored in a database. In case the actual change of the state occurs with a lower frequency than it is reported, in order to reduce the number of writes to the database we might want to filter out duplicated messages using Kafka Streams.

'Distinct' operation is common in data processing, e. g.

  • java.util.stream.Stream has distinct() method,
  • SQL has DISTINCT keyword.

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an example of how distinct can be emulated, but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, similar to windowed joins and aggregations for KStreams.

Public Interfaces

In accordance with KStreams DSL Grammar, we introduce the following new elements:

  • distinct DSLOperation on a KStream<K, V> DSLObject which returns another KStream<K, V> DSLObject,

  • DistinctParameters DSLParameter.

Using DistinctParameters user provides the following:

  1. KeyValueMapper<K, V, VR> idExtractor — extracts a unique identifier from a record by which we de-duplicate input records. If it returns null, the record will not be considered for de-duping but forwarded as-is. If not provided, defaults to (key, value) -> KeyValue.pair(key, value), which means deduplication based on both key and value of the record.
  2. TimeWindows timeWindows — tumbling or hopping time-based window specification. Required parameter. Only the first message with a given id that falls into a window will be passed downstream.
  3. boolean isPersistent — whether the WindowStore that stores the duplicates should be persistent or not.

Proposed Changes

  1. Add the following method to KStream:
KStream<K, V> distinct(DistinctParameters<K, V> params)


2. Add and implement the following DistinctParameters class:

class DistinctParameters <K, V> implements Named<DistinctParameters <K,V>> {
    static DistinctParameters <K, V> with(TimeWindows timeWindows);
    static DistinctParameters <K, V> with(TimeWindows timeWindows, KeyValueMapper<K, V, VR> idExtractor);
    static DistinctParameters <K, V> with(TimeWindows timeWindows, KeyValueMapper<K, V, VR> idExtractor, boolean isPersistent);
}

Compatibility, Deprecation, and Migration Plan

The proposed change is backwards compatible, no deprecation or migration needed.

Rejected Alternatives

None

  • No labels