Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We'd like to improve on the user experience by allowing users to name some operators explicitly and by extension any repartition topics resulting from the operation.   By giving names to repartition topics, any changes to a topology will affect those names and will enable rolling re-deployments which is essential for those users who don't want a production outage for changes in the topology. 

There is an existing KIP-230 the goal of which is to name repartition topics for windowed stream-stream joins. Also KIP-221 aims to provide repartition hints in the Streams DSL, including repartition topic name, number of partitions, etc.  This KIP will subsume KIP-230, while having some overlaps but not completely overlap with KIP-221.

We also need to consider how repartition topics come into play with respect to naming so some additional contextual information would be helpful.

...

  1. KStream involved in joins (including S-S and S-T): if the KStream is ever generated from key changing operators, we cannot guarantee if it’s still partitioned by the key on its source topic, and hence need to repartition.

  2. KStream involved in aggregates, following a groupBy function: as long as it is not groupByKey, we must repartition; and even if it is groupByKey, if the KStream has key changing operations preceded similar to 1), we also need to repartition.

  3. KTable involved in aggregations, following a groupBy function: we will always repartition since we do not have KTable#groupByKey at all.

    * Note that since there is no other key-changing operations for KTables except groupBy, KTable joins (T-T) never need to require repartitioning as its joining KTables are always safe to assume they are partitioned by key from their source topics (or from the previous repartition topics).

Scope Overlaps

There is an existing KIP-230 the goal of which is to name repartition topics for windowed stream-stream joins. Also KIP-221 aims to provide repartition hints in the Streams DSL, including repartition topic name, number of partitions, etc.  

This KIP will subsume KIP-230, while having some overlaps but not completely subsume KIP-221.

Public Interfaces

This KIP will introduce an new class Grouped.  There will be changes made to the Joined class as well as the KStream, and KTable interfaces described below.

...

As mentioned above this KIP will add a new class Grouped.  

Adding Grouped

The new Grouped class can contain a Serde for the key, Serde for the value and a name for the operation.  

    1. Note that a unique name must be provided for each Grouped instance provided in the topology or a ToplogyException will result from the duplicated names when the topology is built.
Code Block
languagejava
titleGrouped
public class Grouped<K, V> {

  public static Grouped name(final String name)

  public static <K, V> Grouped<K, V> with(final Serde<K> keySerde,
                                          final Serde<V> valueSerde)

  public Grouped withName(final String name)

  public  Grouped<K, V> withKeySerde(final Serde<K> keySerde) 

  public  Grouped<K, V> withValueSerde(final Serde<V> valueSerde) 
}

Joined Changes    

 2 . Joined will be updated to accept a "name" parameter. 

...

Code Block
languagejava
titleJoined changes
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
                                               final Serde<V> valueSerde,
                                               final Serde<VO> otherValueSerde,
                                               final String name)


public static <K, V, VO> Joined<K, V, VO> name(final String name)


public Joined<K, V, VO> withName(final String name)


public String name()

     

KStream Changes

3. There will be new methods added to KStream interfaces accepting a Grouped instance as a parameter. 

...

Code Block
languagejava
titleKStream Added Methods and Deprectations
KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);


KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                              final Grouped<KR, V> grouped);



@Deprecated
KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);


@Deprecated
KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                              final Serialized<KR, V> serialized);

   

KTable Changes

4. There will be a new method added to the KTable interface accepting a Grouped instance as a parameter.

...