You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-4835

KAFKA-10844

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Each key changing operation in Kafka Streams (selectKey, map, transform, etc.) now leads to automatic repartition before an aggregation.

But there is quite a large class of key mappings that do not require repartition: every mapping that gives different result for different source partition does not require repartitioning to keep the data locally (see the picture below):

In tickets KAFKA-4835 and KAFKA-10844 the option for canceling the unneeded repartition is being requested. Repartition canceling is also needed for efficient usage of distinct() operator proposed in KIP-655: groupBy(...).windowedBy(...).distinct() will always repartition by default, while in practice this is not always needed.

Public Interfaces

In accordance with KStreams DSL Grammar, we introduce a new parameterless DSLOperation cancelRepartition() on KStream, which resets keyChangingOperation flag for the upstream node.

Introducing a new parameterless method on KStream rather than adding parameters to all the key changing operations will lead to minimum changes in public APIs.

Concerns

  • This operation is unsafe: wrong usage can lead to undefined behaviour. Thus it should be used with care for performance tuning only. This warning should be given in the documentation.
  •  The usage of this operation complicates the usage of IQ. Without repartitioning, the mapped key does not contain the information of the correct partition anymore, since the hash of the mapped key might compute to a wrong instance. However, if we use cancelRepartition correctly, it is always possible to provide the 'reverse mapping' for the key that restores the original key, which then can be used for obtaining the metadata.

Compatibility, Deprecation, and Migration Plan

    No impact on existing users, no migration is needed.

Possible Alternatives

If we don't want to introduce an unsafe operation, we might discuss introducing composite keys as an alternative.

  • CompositeKey<H, P> consists of a head and a postfix, and the partition of a composite key is always defined by its 'head' only.
  • Also, k and CompositeKey(k, v) must have the same partition for each k.
  • We will need to introduce selectCompositeKey operation which will not lead to repartition.

CompositeKey usage will be safe both from pov of data locality and IQ, however it is questionable, whether it will be convenient to use it.

  • No labels