Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}.
	 * Number of partitions is inherited from the source topic.
	 *
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition(Repartitioned)
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition()
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
	 * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
	 *
	 * @param mapper        a {@link KeyValueMapper} that computes a new key for each record
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
	                                final Repartitioned<KR, V> repartitioned);
}


Proposed Changes

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.

...