You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:"Under Discussion"

Discussion thread: here 

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

To ensure uniqueness when naming operators in the topology, Kafka Streams adds an incrementing number to the operator name.  This includes repartition topics which inherit this incrementing numbering scheme.

When a user updates her topology, this can cause the names of the operators to change due to an increase or decrease in the size of the topology, hence altering the names of the repartition topics.  This name change of repartition topics is problematic as the user can no longer to a rolling re-deployment. All streams instances must be stopped, new instances installed then restarted.

We'd like to improve on the user experience by allowing users to name some operators explicitly and by extension any repartition topics resulting from the operation.   By giving names to repartition topics, any changes to a topology will affect those names and will enable rolling re-deployments which is essential for those users who don't want a production outage for changes in the topology. 

We also need to consider how repartition topics come into play with respect to naming so some additional contextual information would be helpful.

Today we have the following scenarios which will introduce a repartition topic:

  1. KStream involved in joins (including S-S and S-T): if the KStream is ever generated from key changing operators, we cannot guarantee if it’s still partitioned by the key on its source topic, and hence need to repartition.

  2. KStream involved in aggregates, following a groupBy function: as long as it is not groupByKey, we must repartition; and even if it is groupByKey, if the KStream has key changing operations preceded similar to 1), we also need to repartition.

  3. KTable involved in aggregations, following a groupBy function: we will always repartition since we do not have KTable#groupByKey at all.

    * Note that since there is no other key-changing operations for KTables except groupBy, KTable joins (T-T) never need to require repartitioning as its joining KTables are always safe to assume they are partitioned by key from their source topics (or from the previous repartition topics).

Scope Overlaps

There is an existing KIP-230 the goal of which is to name repartition topics for windowed stream-stream joins. Also KIP-221 aims to provide repartition hints in the Streams DSL, including repartition topic name, number of partitions, etc.  

This KIP will subsume KIP-230, while having some overlaps but not completely subsume KIP-221.

Public Interfaces

This KIP will introduce an new class Grouped.  There will be changes made to the Joined class as well as the KStream, and KTable interfaces described below.

Proposed Changes

As mentioned above this KIP will add a new class Grouped.

Adding Grouped

The new Grouped class can contain a Serde for the key, Serde for the value and a name for the operation.  

    1. Note that a unique name must be provided for each Grouped instance provided in the topology or a ToplogyException will result from the duplicated names when the topology is built.
Grouped
public class Grouped<K, V> {

  public static Grouped name(final String name)

  public static <K, V> Grouped<K, V> with(final Serde<K> keySerde,
                                          final Serde<V> valueSerde)

  public Grouped withName(final String name)

  public  Grouped<K, V> withKeySerde(final Serde<K> keySerde) 

  public  Grouped<K, V> withValueSerde(final Serde<V> valueSerde) 
}

Joined Changes    

Joined will be updated to accept a "name" parameter. 

                a. The provided name must be unique for each Joined instance provided in the topology or a TopologyException will be thrown when the topology is built. 

b.  Since there are two sides to this operation any naming done for the left-hand or calling stream the name is prepended with "this-", and names for the passed in stream are prepended with "other-" when generating internal names for the join

c. The name is used for internal repartition topics with the same conventions mentioned in part b above.  Named repartitioned topics will still have "application.id-" prepended to the front and "-repartition" appended to the end.

d. If the name is not provided the normal naming conventions will be followed.

Joined changes
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
                                               final Serde<V> valueSerde,
                                               final Serde<VO> otherValueSerde,
                                               final String name)


public static <K, V, VO> Joined<K, V, VO> name(final String name)


public Joined<K, V, VO> withName(final String name)


public String name()

     

KStream Changes

There will be new methods added to KStream interfaces accepting a Grouped instance as a parameter. 

             a. The current methods accepting a Serialized instance will be deprecated.

             b. The name if provided, is used for the internal name.

             c. The name is used for internal repartition topics if needed.  Named repartitioned topics will still have "application.id-" prepended to the front and "-repartition" appended to the end.

             d. If no name is provided then the normal naming conventions are used.

             e. The provided name must be unique for each Grouped instance provided in the topology or a TopologyException will be thrown when the topology is built. 

KStream Added Methods and Deprectations
KGroupedStream<K, V> groupByKey(final Grouped<K, V> grouped);


KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                              final Grouped<KR, V> grouped);



@Deprecated
KGroupedStream<K, V> groupByKey(final Serialized<K, V> serialized);


@Deprecated
KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
                              final Serialized<KR, V> serialized);

   

KTable Changes

There will be a new method added to the KTable interface accepting a Grouped instance as a parameter.

             a. The current methods accepting a Serialized instance will be deprecated.

             b. The name if provided, is used for the internal name.

             c. The name is used for internal repartition topics if needed.  Named repartitioned topics will still have "application.id-" prepended to the front and "-repartition" appended to the end.

             d. If no name is provided then the normal naming conventions are used.

             e. The provided name must be unique for each Grouped instance provided in the topology or a TopologyException will be thrown when the topology is built. 

KTable Added Method and Deprecation
KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                              final Grouped<KR, VR> grouped);


@Deprecated
KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
                              final Serialized<KR, VR> serialized);


Compatibility, Deprecation, and Migration Plan

  • Since these changes (and naming) are optional for users to implement, there is no impact on existing code.
  • If users elect to make naming changes on an existing topology, they may need to reset their application.
  • The deprecated methods will be removed at a later release TBD.

Rejected Alternatives

N/A

  • No labels