Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
 

Status

Current stateUnder DiscussionAccepted

Voting threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg99680.html

Discussion threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg99111.html

JIRA:  

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6037
Jira
serverId
serverASF JIRA
5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4835
 
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8611

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10003

PR: https://github.com/apache/kafka/pull/7170

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today the Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios. By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, at this point, when using DSL in Kafka Streams, data re-partition happens only when key-changing operation is followed by stateful operation. On the other hand, in DSL, stateful computation can happen using transform() operation as well. Problem with this approach is that, even if any upstream operation was key-changing before calling transform(), no auto-repartition is triggered. If repartitioning is required, a call to through(String) should be performed before transform(). With the current implementation, burden of managing and creating the topic falls on user and introduces extra complexity of managing Kafka Streams application. 

Public interfaces

With current API contract in Kafka Streams DSL we have possibility to control and specify producer level configuration using Produced<K, V> class, which we can pass to various operators like KStream#to, KStream#through etc. However, in current Kafka Streams DSL, there's no possibility to specify topic level configurations for different operators that potentially may create internal topics. In order to give the user possibility to control parallelism of sub-topologies and potentially make internal topic configuration more flexible, we shall add following configuration class.

Code Block
languagejava
titleRepartitioned.java
/**
 * This class is used to provide the optional parameters for internal repartitioned topics when using
 * using:
 * - {@link KStream#repartition(Repartitioned)}
 * - {@link KStream#repartition(KeyValueMapper, Repartitioned)}
 * - {@link KStream#groupByKey(Repartitioned)}
 * - {@link KStream#groupBy(KeyValueMapper, Repartitioned)}
 *
 * @param @param <K> key type
 * @param <V> value type
 */
public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected final String name;

	protected final Serde<K> keySerde;

	protected final Serde<V> valueSerde;

	protected final Integer numberOfPartitions;

	protected final StreamPartitioner<? super K, ? super V> partitioner;

	private Repartitioned(String name,
	                      Serde<K> keySerde,
	                      Serde<V> valueSerde,
	                      Integer numberOfPartitions,
	                      StreamPartitioner<? super K, ? super V> partitioner) {
		this.name = name;
		this.keySerde = keySerde;
		this.valueSerde = valueSerde;
		this.numberOfPartitions = numberOfPartitions;
		this.partitioner = partitioner;
	}

	/**
	 * Create a {@link Repartitioned} instance with the provided name used as part of the repartition topic if required.
	 *
	 * @param name the name used as a processor named and part of the repartition topic name if required.
	 * @param <K>  key type
	 * @param <V>  value type
	 * @return A new {@link Repartitioned} instance configured with processor name and repartition topic name
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> as(final String name) {
		return new Repartitioned<>(name, null, null, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided key serde and value serde.
	 *
	 * @param keySerde   Serde to use for serializing the key
	 * @param valueSerde Serde to use for serializing the value
	 * @param <K>        key type
	 * @param <V>        value type
	 * @return A new {@link Repartitioned} instance configured with key serde and value serde
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> with(final Serde<K> keySerde,
	                                              final Serde<V> valueSerde) {
		return new Repartitioned<>(null, keySerde, valueSerde, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided name and number of partitions usedpartitioner.
	 *
	 * as@param part ofpartitioner the repartitionfunction topicused configurationto if required.
	 *
	 * @param <K>    determine how records are distributed among partitions of the topic,
	 *            key type
	 * @param <V>    if not specified and the key serde provides a {@link WindowedSerializer} for valuethe typekey
	 * @param  name               the name used{@link asWindowedStreamPartitioner} partwill ofbe theused—otherwise repartition{@link topicDefaultPartitioner} namewill ifbe requiredused
	 * @param numberOfPartitions<K> number of partitions used for creating repartition topic ifkey requiredtype
	 * @return@param A<V> new instance of {@link Repartitioned} with configured name and number of partitionsvalue type
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned) @return A new {@link Repartitioned} instance configured with partitioner
	 * @see KStream#groupByKeyKStream#repartition(Repartitioned)
	 * @see KStream#groupByKStream#repartition(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> withstreamPartitioner(final StreamPartitioner<? Stringsuper nameK,
	 ? super V> partitioner) {
		return                                         final int numberOfPartitions) {
		return new Repartitioned<>(namenew Repartitioned<>(null, null, null, numberOfPartitionsnull, nullpartitioner);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided partitioner number of partitions for repartition topic if required.
	 *
	 * @param partitionernumberOfPartitions thenumber functionof partitions used towhen creating determinerepartition howtopic recordsif arerequired
	 distributed* among@param partitions<K> of the topic,
	 *            key type
	 * @param <V>    if not specified and the key serde provides a {@link WindowedSerializer} for thevalue keytype
	 * @return A new {@link Repartitioned} instance configured number of partitions
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be usedRepartitioned<K, V> numberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(null, null, null, numberOfPartitions, null);
	}

	/**
	 * @paramCreate <K>a new instance of {@link Repartitioned} with the provided keyname type
	used *as @parampart <V>of repartition topic and      value typeprocessor name.
	 * @returnNote that AKafka newStreams {@linkcreates Repartitioned}repartition instancetopic configuredonly withif partitionerrequired.
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)@param name the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the name
	 */
	public static <K, V>@Override
	public Repartitioned<K, V> streamPartitionerwithName(StreamPartitioner<?final super K, ? super V> partitionerString name) {
		return new Repartitioned<>(nullname, nullkeySerde, nullvalueSerde, nullnumberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} instancewith withthe provided number of partitions for repartition topic if required.
	 *
	 *Note @paramthat numberOfPartitionsKafka number of partitions used when creating Streams creates repartition topic only if required.
	 *
	 * @param <K>numberOfPartitions the name used for the processor name and as part of the repartition topic name keyif typerequired
	 * @param@return <V>a new {@link Repartitioned} instance configured with the number of partitions
	 */
	public Repartitioned<K, V> withNumberOfPartitions(final int  value typenumberOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * @returnCreate Aa new instance of {@link Repartitioned} instancewith configuredthe numberprovided ofkey partitionsserde.
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)@param keySerde Serde to use for serializing the key
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned) @return a new {@link Repartitioned} instance configured with the key serde
	 */
	public static <K, V> Repartitioned<K, V> numberOfPartitionswithKeySerde(intfinal Serde<K> numberOfPartitionskeySerde) {
		return new Repartitioned<>(nullname, nullkeySerde, nullvalueSerde, numberOfPartitions, nullpartitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided name used as part of repartition topic and processor name.value serde.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param namevalueSerde theSerde nameto useduse for the processor name and as part of the repartition topic name if requiredserializing the value
	 * @return a new {@link Repartitioned} instance configured with the value nameserde
	 */
	@Override
	public Repartitioned<K, V> withNamewithValueSerde(final StringSerde<V> namevalueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided number of partitions for repartition topic.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param numberOfPartitions the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the number of partitions
	 */
	public Repartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided key serde.
	 *
	 * @param keySerde Serde to use for serializing the keypartitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
	 * @return a new {@link Repartitioned} instance configured with theprovided key serdepartitioner
	 */
	public Repartitioned<K, V> withKeySerdewithStreamPartitioner(final Serde<K> keySerde StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided value serde.
	 *
	 * @param valueSerde Serde to use for serializing the value
	 * @return a new {@link Repartitioned} instance configured with the value serde
	 */
	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}}


New KStream#repartition operations shall be introduced in order to give the user control over parallelism for sub-topologies. Additionally, we deprecate KStream#through in favor of the new #repartition methods.

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

    @Deprecated
    KStream<K, V> through(final String topic);

    @Deprecated
    KStream<K, V> through(final String topic, final Produced<K, V> produced);

	/**
	 * CreateMaterialize athis newstream instance of {@link Repartitioned} with the provided partitioner.
	 *to a auto-generated repartition topic and creates a new {@code KStream}
	 * @param partitionerfrom the functionauto-generated usedtopic tousing determinedefault howserializers, records are distributed among partitions of the topic,deserializers, producer's {@link DefaultPartitioner}.
	 * Number of partitions is inherited from the source topic.
	 *
	 * @return a {@code KStream} that contains the exact if not specified same (and thepotentially keyrepartitioned) serderecords providesas athis {@link@code WindowedSerializerKStream}
	 for* the key@see #repartition(Repartitioned)
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@link@code WindowedStreamPartitioner} will be used—otherwiseKStream}
	 * from the auto-generated topic using {@link DefaultPartitioner}Serde wilkey be used
	 * @return a newserde}, {@link Serde value serde}, {@link RepartitionedStreamPartitioner},
	 instance* configurednumber withof providedpartitions partitioner
	 */
	public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}
}

New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.

Code Block
languagejava
public interface KStream<K, V> {

	/**
	 * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
	 * and using the serializers as defined by {@link Repartitioned}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
	 * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
	 * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
	 * {@link #through(String)}, {@link #repartition)}) an internal repartitioning topic may need to be created in Kafka if a later operator
	 * depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * <name> is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an internally
	 * generated name, and "-repartition" is a fixed suffix.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
	 * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
	 * correctly on its key.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
	 *                      part of the name and number of partitions for a repartition topic if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 * @see #groupBy(KeyValueMapper, Repartitioned)
	 */
	<KR> KGroupedStream<KR, V> groupByKey(final Repartitioned<KR, V> repartitioned);

	/**
	 * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
	 * operator depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an
	 * internally generated name.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
	 * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
	 * <p>
	 * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);
}

In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:

and topic name part as defined by {@link Repartitioned}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition()
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);
}

Correspondingly, the Scala API will be updated including an implicit conversation from key/value Serdes to a Repartitioned instance.

Code Block
class KStream[K, V](val inner: KStreamJ[K, V]) {
  @deprecated
  def through(topic: String)(implicit produced: Produced[K, V])

  def repartition(implicit repartitioned: Repartitioned[K, V])
}

object Repartitioned {
  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V])

  def `with`[K, V](name: String)(implicit keySerde: Serde[K], valueSerde: Serde[V])

  def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K], valueSerde: Serde[V])

  def `with`[K, V](numberOfPartitions: Int)(implicit keySerde: Serde[K], valueSerde: Serde[V])
}

object ImplicitConversions {
  implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V])
}


Proposed Changes

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change. Using using KStream#through can either switch to the new #repartition method (which should be the common use case) or rewrite their code to use #to() and StreamsBuilder#stream() (note that #through() is just syntactic sugar for those two calls anyway).

Rejected Alternatives

Repartition "hint" in groupBy operations

In the mailing thread discussion, concern was raised that adding number of partitions configuration option to group by operations, such as `KStream#groupByKey(Repartitioned)` may not be the best option. Main argument against it is that whenever user specified number of partitions for internal, repartition topics, he/she really cares that those configuration will be applied. Case with group by is that, repartitioning will not happen at all if key changing operation isn't performed, therefore number of partitions configuration specified by the user will never kick-in. Alternatively, if user cares about manual repartitioning, one may do following in order to scale up/down sub topologies:

Code Block
languagejava
builder.stream("topic")
       .repartition((key, value) -> value.newKey(), Repartitioned.withNumberOfPartitions(5))
	   .groupByKey()
	   .count();


Follow-up ticket for this feature can be found here: 

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9197


Rejected KSream interface changes:

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {
	/**
	 * Re-groups the records of this {@code KTable} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedTable}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
     * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)}
     * or generated internally, and "-repartition" is a fixed suffix.
     * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
     * repartition topic will be generated with the specified number of partitions.
     * If not, number of partitions will be
Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}.
	 * Number of partitions is inherited from the source topic.
	 * <p>
	 * @returnYou acan {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition(Repartitioned)
	 * @see #repartition(KeyValueMapper, Repartitioned)retrieve all generated internal topic names via {@link Topology#describe()}.
	 */
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}* <p>
	 * All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
	 * fromrecords theto auto-generatedand topicrereading usingall {@linkupdated Serderecords keyfrom serde}it, {@linksuch Serdethat valuethe serde},resulting {@link StreamPartitioner},KGroupedTable} is partitioned
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 *on the new key.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link org.apache.kafka.streams.processor.StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KStream<KR, V> repartition(Repartitioned<KR, V> repartitioned);

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStreamrepartition topic, if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * from the auto-generated topic using@return a {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, KGroupedStream} that contains the grouped records of the original {@code KStream}
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
	 * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
	 /
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);

	/**
	 * @paramRe-groups mapperthe records by their current key into  a {@link KeyValueMapperKGroupedStream} thatwhile computespreserving athe neworiginal keyvalues
 for each record
	  * @paramand repartitionedusing the {@linkserializers Repartitioned}as instancedefined used toby specify {@link org.apache.kafka.common.serialization.Serdes},Repartitioned}.
	 * If the new record key is {@code null} the record will not be included in the     resulting {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,KGroupedStream}.
	 * <p>
	 * An internal repartitioning topic may be created in Kafka, if number of partitions provided via
     *  part of the topic name and number of partitions for a repartition topic, if repartitioning is required.{@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)} is different compared to source topic of this {@link KTable},
	 * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
	                                Repartitioned<KR, V> repartitioned);
}

...

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.
  • For KStream#groupBy(Repartitioned) repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified Repartitioned#numberOfPartitiones; otherwise use the upstream topic partition size as the new topic number of partitions.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.

...

If repartitioned topic is created, it will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>"
     * is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or generated internally, and "-repartition" is a fixed suffix.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link org.apache.kafka.streams.processor.StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KTable}
	 */
	KGroupedStream<K, V> groupBy(final Repartitioned<K, V> repartitioned);
}

Why not use Produced operation for specifying number of partitions?

There're two main reasons why not to use Produced class for specifying number of partitions.

...