Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

JIRA:  

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6037
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4835
 
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8611
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10003

PR: https://github.com/apache/kafka/pull/7170

...

New KStream#repartition operations shall be introduced in order to give the user control over parallelism for sub-topologies:. Additionally, we deprecate KStream#through in favor of the new #repartition methods.

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	/**
	 * Materialize this stream@Deprecated
 to a   KStream<K, V> through(final String topic);

    @Deprecated
    KStream<K, V> through(final String topic, final Produced<K, V> produced);

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}.
	 * Number of partitions is inherited from the source topic.
	 *
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition(Repartitioned)
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition()
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
	 * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
	 *
	 * @param mapper        a {@link KeyValueMapper} that computes a new key for each record
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
	                                final Repartitioned<KR, V> repartitioned);}

Correspondingly, the Scala API will be updated including an implicit conversation from key/value Serdes to a Repartitioned instance.

Code Block
class KStream[K, V](val inner: KStreamJ[K, V]) {
  @deprecated
  def through(topic: String)(implicit produced: Produced[K, V])

  def repartition(implicit repartitioned: Repartitioned[K, V])
}

object Repartitioned {
  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V])

  def `with`[K, V](name: String)(implicit keySerde: Serde[K], valueSerde: Serde[V])

  def `with`[K, V](partitioner: StreamPartitioner[K, V])(implicit keySerde: Serde[K], valueSerde: Serde[V])

  def `with`[K, V](numberOfPartitions: Int)(implicit keySerde: Serde[K], valueSerde: Serde[V])
}

object ImplicitConversions {
  implicit def repartitionedFromSerde[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V])
}


Proposed Changes

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.

...

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change. Using using KStream#through can either switch to the new #repartition method (which should be the common use case) or rewrite their code to use #to() and StreamsBuilder#stream() (note that #through() is just syntactic sugar for those two calls anyway).

Rejected Alternatives

Repartition "hint" in groupBy operations

...