Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion Accepted, targeting 3.7

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-4835


JIRA:

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4835

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10844

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Each key changing operation in Kafka Streams (selectKey, map, transform, etc.) now leads to automatic repartition before an a stateful aggregation.

But there is quite a large class of key mappings that do not require repartition: every mapping that gives different result for different source partition does not require repartitioning to keep the data locally (see the picture below):

Image Removed

However, repartition is not always necessary if the input stream is already correctly partitioned. In these cases, the automatic repartition brings in additional overhead. As an example, if an input stream comes in partitioned by key1, calling the function selectKey( ... => (key1, metric)).groupByKey  will trigger a repartition today. 

In tickets KAFKA-4835 and KAFKA-10844 the option for canceling the unneeded repartition is being requested. Repartition canceling is also needed for the efficient usage of distinct() operator operators proposed in KIP-655: groupBy(...).windowedBy(...).distinct() will always repartition by default, while in practice this is not always needed in practice.

This KIP proposes a new interface for users to optimize the key changing operations (selectKey, map, transform, etc.) in Kafka Streams. 

Public Interfaces

In accordance with KStreams DSL Grammar, we introduce a new parameterless DSLOperation cancelRepartition markAsPartitioned() on KStream.

Code Block
languagejava
public interface KStream<K, V> {    
	/**
     * Marking the {@code KStream

...

} as partitioned signals the stream is partitioned as intended,
     * and does not require further repartitioning in downstream key changing operations.
     *
     * <p><em>
     *     Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with interactive query(IQ) or {@link KStream#join}.
     *     For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation.
     *     However, if the repartitions were cancelled, records stayed in their original partition by its original key. IQ or joins
     *     assumes and uses the composite key instead of the original key.
     * </p></em>
     *
     * @return a new, mutated {@code KStream} that will not repartition in subsequent operations.
     */
    KStream<K, V> markAsPartitioned();

}

Proposed Change

Calling the new DSLOperation will return a new, mutated KStream. The new instance will not repartition as downstream operations are chained onto it. Whereas the original stream keeps its own internal property to operate in the default way. 

Usage

Example: canceling repartition in a streams aggregation would look like:

Code Block
languagejava
stream
	.selectKey( ... => (key1, metric))
	.markAsPartitioned()
	.groupByKey()
	.aggregate()

Example: fan out from the same stream:

Code Block
KStream myStream = build.stream(...).map(...);

// the aggregation will not repartition as it works on a mutated KStream
myStream.markAsPartiitoned().groupByKey().aggregate(...);

// the join operation will repartition as it left joins with the original KStream
myStream.join(myOtherTable);


Concerns

  • The

Introducing a new parameterless method on KStream rather than adding parameters to all the key changing operations will lead to minimum changes in public APIs.

Concerns

  • This operation is unsafe: wrong usage can lead to undefined behaviour. Thus it should be used with care for performance tuning only. This warning should be given in the documentation.
  •  The usage of this operation complicates the usage of IQ(Interactive Query) and joins. For reasons that when repartitions happen, records are physically shuffled by a composite key defined in the stateful operation. However, when the repartitions are canceled, records stayed in their original partition by their original key. IQ assumes and uses the composite key instead of the original key. That's when IQ can break downstream. The same applies to joins. 
  • In the documentation, we specifically advise against using the interface with IQ or joins. 
  • However, a potential solution to support IQ is to provide a . Without repartitioning, the mapped key does not contain the information of the correct partition anymore, since the hash of the mapped key might compute to a wrong instance. However, if we use cancelRepartition correctly, it is always possible to provide the 'reverse mapping' for the composite key that restores the original key, which can then can be used for obtaining the metadata. We can follow up with a change when there is request. 

Compatibility, Deprecation, and Migration Plan

    No impact on existing users, no migration is needed.

Possible Alternatives

Option 1: Composite Key

If we don't want to introduce an unsafe operation, we might discuss introducing compound composite keys as an alternative.CompoundKey<H

  • CompositeKey<H,

...

  • P> consists of a head and a postfix, and the partition of a

...

  • composite key is always defined by its 'head' only.
  • Also, k and

...

  • CompositeKey(k, v) must have the same partition for each k.
  • We will need to introduce selectCompositeKey operations that will not lead to repartition.

CompositeKey the design CompoundKey usage will be safe both from the pov of data locality and IQ , however it is questionable, whether it will be convenient to use itbut adds complexity to the usage.

Option 2: Optional configuration in Named Operations( Joined  , Grouped , etc)

  • It would allow us to hit only the relevant parts of the DSL and be less prone to undesired behaviors when it comes to IQ or joins. 
  • More generic, can be applied to KTable as well. In comparison, the markAsPartitioned()   approach is targeting the KStreams interface only where it focuses on a specific set of overhead/pain points introduced by repartitionRequired . 
  • It touches on a larger surface area of the API.