Table of Contents |
---|
Status
Current state: Draft [One of "Under Discussion", "Accepted", "Rejected"]Voting
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
Voting thread: here
JIRA:
Jira | ||||||||
---|---|---|---|---|---|---|---|---|
|
Pull request: PR-9210
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
'Distinct' operation is common in data processing, e. g.
- SQL
DISTINCT
keyword, - In standard libraries for programming languages
- .NET LINQ
Distinct
method, - Java Stream
-
distinct()
, - Scala Seq
distinct()
,
- .NET LINQ
- In data processing frameworks:
Apache Spark's
distinct()
,- Apache Flink's
distinct()
, - Apache Beam's
Distinct()
, - Hazelcast Jet's
distinct()
, etc.
Hence it is natural to expect the similar functionality from Kafka Streams.
Although Kafka Streams Tutorials contains an example of how distinct
can be emulated, but this example is complicated: it involves low-level coding with local state store and a custom transformer. It might be much more convenient to have distinct
as a first-class DSL operation.
Due to 'infinite' nature of KStream, distinct
operation should be windowed, similar to windowed joins and aggregations for KStreams.
...
In accordance with KStreams DSL Grammar, we introduce the following new elements:
distinct()
parameterless DSLOperation on- TimeWindowedKStream<K,V> DSLObject which returns KStream<Windowed<K>,V>
- SessionWindowedKStream<K,V> DSLObject which returns KStream<Windowed<K>,V>
The following methods are added to the corresponding interfaces:
Code Block | ||||
---|---|---|---|---|
| ||||
KTable<Windowed<K>, V> distinct(final Named named);
KTable<Windowed<K>, V> distinct(final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized);
KTable<Windowed<K>, V> distinct(final Named named,
final Materialized<K, V, WindowStore<Bytes, byte[]>> materialized); |
The distinct operation returns only a first record that falls into a new window, and filters out all the other records that fall into an already existing window.
The records are considered to be duplicates iff serialized forms of their keys are equal.
Usage Examples
Consider the following example (record times are in seconds):
//three bursts of variously ordered records
4, 5, 6
23, 22, 24
34, 33, 32
//'late arrivals'
7, 22, 35
'Epoch-aligned deduplication' using tumbling windows
.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
produces
(key@[00000/10000], 4)
(key@[20000/30000], 23)
(key@[30000/40000], 34)
-- that is, one record per epoch-aligned window.
Note: hopping and sliding windows do not make much sense for distinct()
because they produce multiple intersected windows, so that one record can be multiplied instead of deduplication.
SessionWindows work for 'data-aligned deduplication'.
.groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
produces only
([key@4000/4000], 4)
([key@23000/23000], 23)
because all the records bigger than 7 are 'stuck together' in one session. Setting inactivity gap to 9 seconds will return three records:
([key@4000/4000], 4)
([key@23000/23000], 23)
([key@34000/34000], 34)
Compatibility, Deprecation, and Migration Plan
The proposed change is backwards compatible, no deprecation or migration needed.
Rejected Alternatives
The following was rejected during the discussion in favour of simpler approach:
Public Interfaces
In accordance with KStreams DSL Grammar, we introduce the following new elements:
distinct
DSLOperation on aKStream<K, V>
DSLObject which returns anotherKStream<K, V>
DSLObject,DistinctParameters<K, V, I>
DSLParameter.- Description
The type parameters are:
K
— key typeV
— value typeI
— the type of the record's unique identifier
With DistinctParameters<K, V, I>
the following can be providedUsing DistinctParameters, user provides the following:
KeyValueMapper<K, V, VR>I>
idExtractor
— extracts a unique identifier from a record by which we de-duplicate input records, if . If it returns null, the record will not be considered for de-duping but and forwarded as-is. If not provided, defaults to(key, value) -> KeyValue.pair( key
, value), which means deduplication in case full record (key and value) matches its previous statebased on key of the record. Important assumption: records from different partitions should have different IDs, otherwise same IDs might be not co-partitioned.TimeWindows timeWindows
— tumbling or hopping time-based window specification. Required parameter. Only the first message with a given id that falls into a window will be passed downstream.Serde<I> idSerde
— serde for unique identifier.boolean isPersistent
— whether theWindowStore
that stores the duplicates unique ids should be persistent or not. In many cases, non-persistent store will be preferrable because of better performance. Downstream consumers must be ready to accept occasional duplicates.
Proposed Changes
- Add the following method to
KStream
interface:
Code Block | ||
---|---|---|
| ||
<I> KStream<K, V> distinct(DistinctParametersDistinctParameters<K, V, I> params); |
Given the parameters, this method returns a new KStream
with only the first occurence of each record in any of the time windows, deduplicated by unique id. Any subsequent occurences in the time window are filtered out.
2. Add and implement the following DistinctParameters class:
Code Block | ||
---|---|---|
| ||
class DistinctParameters <K, V> implements Named<DistinctParameters <K,V>> {class DistinctParameters<K, V, I> extends Named { /** Windowing parameters only. {@code (k,v)->k} id extractor is assumed, and a persistent store with key serde is used*/ public static DistinctParameters <K, V> as(String name);DistinctParameters<K, V, K> with(final TimeWindows timeWindows); /** Windowing parameters and a store persistency flag. {@code (k,v)->k} id extractor is assumed and a key serde is used*/ public static DistinctParameters <K, V> DistinctParameters<K, V, K> with(final TimeWindows timeWindows, final boolean isPersistent); /** Windowing parameters, ID extractor, and a serde for unique IDs. A persistent store will be used.*/ public static DistinctParameters <K<K, V, I> DistinctParameters<K, V, V>I> with(final TimeWindows timeWindows, final KeyValueMapper<K, V, VR>I> idExtractor, TimeWindows timeWindows); static DistinctParameters <K, V> with( final Serde<I> idSerde); /** Windowing parameters, ID extractor, a serde for unique IDs, and a flag showing whether the {@code WindowStore} should be * persistent or not.*/ public static <K, V, I> DistinctParameters<K, V, I> with(final TimeWindows timeWindows, final KeyValueMapper<K, V, VR>I> idExtractor, TimeWindows timeWindows, final Serde<I> idSerde, final boolean isPersistent); } |
Compatibility, Deprecation, and Migration Plan
The proposed change is backwards compatible, no deprecation or migration needed.
Rejected Alternatives
...