Status
Current state: Draft Under Discussion This KIP is work in progress
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Pull request: PR-9210
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Message deduplication is a common task.
One example: we might have multiple data sources each reporting its state periodically with a relatively high frequency, their current states should be stored in a database. In case the actual change of the state occurs with a lower frequency than it is reported, in order to reduce the number of writes to the database we might want to filter out duplicated messages using Kafka Streams.
'Distinct' operation is common in data processing, e. g.
java.util.stream.Stream
hasdistinct()
method,- SQL has DISTINCT keyword.
Hence it is natural to expect the similar functionality from Kafka Streams.
Although Kafka Streams Tutorials contains an example of how distinct can be emulated, but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct as a first-class DSL operation.
Due to 'infinite' nature of KStream, distinct operation should be windowed, similar to windowed joins and aggregations for KStreams.
Public Interfaces
In accordance with KStreams DSL Grammar, we introduce the following new elements:
distinct
DSLOperation on aKStream<K, V>
DSLObject which returns anotherKStream<K, V>
DSLObject,DistinctParameters<K, V, I>
DSLParameter. The type parameters are key type, value type and the type of the record's unique identifier.
With DistinctParameters<K, V, I>
the following can be provided:
KeyValueMapper<K, V, I>
idExtractor
— extracts a unique identifier from a record by which we de-duplicate input records. If it returns null, the record will not be considered for de-duping and forwarded as-is. If not provided, defaults to(key, value) -> key
, which means deduplication based on key of the record.TimeWindows timeWindows
— tumbling or hopping time-based window specification. Required parameter. Only the first message with a given id that falls into a window will be passed downstream.Serde<I> idSerde
— serde for unique identifier.boolean isPersistent
— whether theWindowStore
that stores the unique ids should be persistent or not. In many cases, non-persistent store will be preferrable, since downstream consumers should be ready to accept duplicates sometimes.
Proposed Changes
- Add the following method to
KStream
interface:
<I> KStream<K, V> distinct(DistinctParameters<K, V, I> params);
Given the parameters, this method returns a new KStream
with only the first occurence of each record in any of the time windows, deduplicated by unique id. Any subsequent occurences in the time window are filtered out.
2. Add and implement the following DistinctParameters class:
class DistinctParameters<K, V, I> extends Named { /** Windowing parameters only. {@code (k,v)-k} id extractor is assumed, and a persistent store with key serde is used*/ public static <K, V> DistinctParameters<K, V, K> with(final TimeWindows timeWindows); /** Windowing parameters, ID extractor, and a serde for unique IDs. Store persistency is assumed.*/ public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows, final KeyValueMapper<K, V, I> idExtractor, final Serde<I> idSerde); /** Windowing parameters, ID extractor, a serde for unique IDs, and a flag showing whether the {@code WindowStore} should be * persistent or not.*/ public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows, final KeyValueMapper<K, V, I> idExtractor, final Serde<I> idSerde, final boolean isPersistent) }
Compatibility, Deprecation, and Migration Plan
The proposed change is backwards compatible, no deprecation or migration needed.
Rejected Alternatives
None