Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRepartitioned.java
public/**
 * This class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected final String name;

	protected final Serde<K> keySerde;

	protected final Serde<V> valueSerde;

	protected final Integer numOfPartitions;

	protected final StreamPartitioner<? super K, ? super V> partitioner;

	private Repartitioned(String name,
	                      Serde<K> keySerde,
	                      is used to provide the optional parameters for internal repartitioned topics when using
 * using:
 * - {@link KStream#repartition(Repartitioned)}
 * - {@link KStream#repartition(KeyValueMapper, Repartitioned)}
 * - {@link KStream#groupByKey(Repartitioned)}
 * - {@link KStream#groupBy(KeyValueMapper, Repartitioned)}
 *
 * @param <K> key type
 * @param <V> value type
 */
public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected final String name;

	protected final Serde<K> keySerde;

	protected final Serde<V> valueSerde;

	protected final Integer numberOfPartitions;

	protected final StreamPartitioner<? super K, ? super V> partitioner;

	private Repartitioned(String name,
	                      Serde<K> keySerde,
	                      Serde<V> valueSerde,
	                      Integer numOfPartitionsnumberOfPartitions,
	                      StreamPartitioner<? super K, ? super V> partitioner) {
		this.name = name;
		this.keySerde = keySerde;
		this.valueSerde = valueSerde;
		this.numOfPartitionsnumberOfPartitions = numOfPartitionsnumberOfPartitions;
		this.partitioner = partitioner;
	}

	public static/**
	 * Create a {@link Repartitioned as(final String name) {
		return new Repartitioned<>(name, null, null, null, null);
	}

	@Override
	public Repartitioned<K, V> withName(final String name) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withNumOfPartitions(final int numOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner} instance with the provided name used as part of the repartition topic if required.
	 *
	 * @param name the name used as a processor named and part of the repartition topic name if required.
	 * @param <K>  key type
	 * @param <V>  value type
	 * @return A new {@link Repartitioned} instance configured with processor name and repartition topic name
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> as(final String name) {
		return new Repartitioned<>(name, keySerdenull, valueSerdenull, numOfPartitionsnull, partitionernull);
	}
}

New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.

Code Block
languagejava
public interface KStream<K, V> {

	KGroupedStream<K, V> groupByKey(final Repartitioned<K, V> repartitioned);

	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	             
	/**
	 * Create a {@link Repartitioned} instance with provided key serde and value serde.
	 *
	 * @param keySerde   Serde to use for serializing the key
	 * @param valueSerde Serde to use for serializing the value
	 * @param <K>        key type
	 * @param <V>        value  final Repartitioned<KR, V> repartitioned);
}

In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:

type
	 * @return A new {@link Repartitioned} instance configured with key serde and value serde
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> with(final Serde<K> keySerde,
	                                              final Serde<V> valueSerde) {
		return new Repartitioned<>(null, keySerde, valueSerde, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided name and number of partitions used
	 * as part of the repartition topic configuration if required.
	 *
	 * @param <K>                key type
	 * @param <V>                value type
	 * @param name               the name used as part of the repartition topic name if required
	 * @param numberOfPartitions number of partitions used for creating repartition topic if required
	 * @return A new instance of {@link Repartitioned} with configured name and number of partitions
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> with(final String name,
	                                              final int numberOfPartitions) {
		return new Repartitioned<>(name, null, null, numberOfPartitions, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided partitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used
	 * @param <K>         key type
	 * @param <V>         value type
	 * @return A new {@link Repartitioned} instance configured with partitioner
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> streamPartitioner(StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(null, null, null, null, partitioner);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided number of partitions for repartition topic if required.
	 *
	 * @param numberOfPartitions number of partitions used when creating repartition topic if required
	 * @param <K>                key type
	 * @param <V>                value type
	 * @return A new {@link Repartitioned} instance configured number of partitions
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> numberOfPartitions(int numberOfPartitions) {
		return new Repartitioned<>(null, null, null, numberOfPartitions, null);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided name used as part of repartition topic and processor name.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param name the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the name
	 */
	@Override
	public Repartitioned<K, V> withName(final String name) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided number of partitions for repartition topic.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param numberOfPartitions the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the number of partitions
	 */
	public Repartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided key serde.
	 *
	 * @param keySerde Serde to use for serializing the key
	 * @return a new {@link Repartitioned} instance configured with the key serde
	 */
	public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided value serde.
	 *
	 * @param valueSerde Serde to use for serializing the value
	 * @return a new {@link Repartitioned} instance configured with the value serde
	 */
	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided partitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
	 * @return a new {@link Repartitioned} instance configured with provided partitioner
	 */
	public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}
}


New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.

Code Block
languagejava
public interface KStream<K, V> {

	/**
	 * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
	 * and using the serializers as defined by {@link Grouped}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
	 * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
	 * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
	 * {@link #through(String)}, {@link #repartition)}) an internal repartitioning topic may need to be created in Kafka if a later operator
	 * depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * <name> is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an internally
	 * generated name, and "-repartition" is a fixed suffix.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
	 * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
	 * correctly on its key.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
	 *                      part of the name and number of partitions for a repartition topic if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 * @see #groupBy(KeyValueMapper, Repartitioned)
	 */
	<KR> KGroupedStream<KR, V> groupByKey(final Repartitioned<KR, V> repartitioned);

	/**
	 * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
	 * operator depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an
	 * internally generated name.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
	 * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
	 * <p>
	 * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
	 *                      part of the name and number of partitions for a repartition topic if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);
}


In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}.
	 * Number of partitions is inherited from the source topic.
	 *
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition(Repartitioned)
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KStream<KR, V> repartition(Repartitioned<KR, V> repartitioned);

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
	 * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
	 *
	 * @param mapper        a {@link KeyValueMapper} that computes a new key for each record
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	// .. //
	
	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration will be inherited from source topic.
	 */
	KStream<K, V> repartition();
	
	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration will be inherited from source topic.
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, KR> selector);

	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration is applied according to {@param repartitioned} specification.
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration is applied according to {@param repartitioned} specification.
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> selectormapper,
	                                final Repartitioned<KR, V> repartitioned);
	// .. //
}


Proposed Changes

For KStream#repartition

...

Why not use Produced operation for specifying number of partitions?

There're multiple two main reasons why not to use Produced class for specifying number of partitions.

...