Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRepartitioned.java
/**
 * This class is used to provide the optional parameters for internal repartitioned topics when using:
 * - {@link KStream#repartition(Repartitioned)}
 * - {@link KStream#repartition(KeyValueMapper, Repartitioned)}
 * - {@link KStream#groupByKey(Repartitioned)}
 * - {@link KStream#groupBy(KeyValueMapper, Repartitioned)}
 *
 * @param <K> key type
 * @param <V> value type
 */
public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected final String name;

	protected final Serde<K> keySerde;

	protected final Serde<V> valueSerde;

	protected final Integer numberOfPartitions;

	protected final StreamPartitioner<? super K, ? super V> partitioner;

	private Repartitioned(String name,
	                      Serde<K> keySerde,
	                      Serde<V> valueSerde,
	                      Integer numberOfPartitions,
	                      StreamPartitioner<? super K, ? super V> partitioner) {
		this.name = name;
		this.keySerde = keySerde;
		this.valueSerde = valueSerde;
		this.numberOfPartitions = numberOfPartitions;
		this.partitioner = partitioner;
	}

	/**
	 * Create a {@link Repartitioned} instance with the provided name used as part of the repartition topic if required.
	 *
	 * @param name the name used as a processor named and part of the repartition topic name if required.
	 * @param <K>  key type
	 * @param <V>  value type
	 * @return A new {@link Repartitioned} instance configured with processor name and repartition topic name
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> as(final String name) {
		return new Repartitioned<>(name, null, null, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided key serde and value serde.
	 *
	 * @param keySerde   Serde to use for serializing the key
	 * @param valueSerde Serde to use for serializing the value
	 * @param <K>        key type
	 * @param <V>        value type
	 * @return A new {@link Repartitioned} instance configured with key serde and value serde
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> with(final Serde<K> keySerde,
	                                              final Serde<V> valueSerde) {
		return new Repartitioned<>(null, keySerde, valueSerde, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided partitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used
	 * @param <K>         key type
	 * @param <V>         value type
	 * @return A new {@link Repartitioned} instance configured with partitioner
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(null, null, null, null, partitioner);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided number of partitions for repartition topic if required.
	 *
	 * @param numberOfPartitions number of partitions used when creating repartition topic if required
	 * @param <K>                key type
	 * @param <V>                value type
	 * @return A new {@link Repartitioned} instance configured number of partitions
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public /
	public static <K, V> Repartitioned<K, V> numberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(null, null, null, numberOfPartitions, null);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided name used as part of repartition topic and processor name.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param name the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the name
	 */
	@Override
	public Repartitioned<K, V> withName(final String name) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided number of partitions for repartition topic.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param numberOfPartitions the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the number of partitions
	 */
	public Repartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided key serde.
	 *
	 * @param keySerde Serde to use for serializing the key
	 * @return a new {@link Repartitioned} instance configured with the key serde
	 */
	public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided value serde.
	 *
	 * @param valueSerde Serde to use for serializing the value
	 * @return a new {@link Repartitioned} instance configured with the value serde
	 */
	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided partitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
	 * @return a new {@link Repartitioned} instance configured with provided partitioner
	 */
	public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}
}


New 

...

KStream#repartition operations shall be introduced in order to give the user control over parallelism for sub-topologies

...

:

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	/**
	 * Group the records by their current key into a {@link KGroupedStream} while preserving the original valuesMaterialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * andfrom using the auto-generated topic using default serializers, as defined by deserializers, producer's {@link RepartitionedDefaultPartitioner}.
	 * GroupingNumber aof stream on the record key partitions is requiredinherited beforefrom anthe aggregation operator can be applied to the datasource topic.
	 * (cf. {@link KGroupedStream}).
	 * If@return a record key is {@code nullKStream} that contains the recordexact willsame not(and bepotentially includedrepartitioned) inrecords theas resultingthis {@link@code KGroupedStreamKStream}.
	 * <p>@see #repartition(Repartitioned)
	 * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},@see #repartition(KeyValueMapper, Repartitioned)
	 * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
	 * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
	 * {@link #through(String)/
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link #repartition)}) an internal repartitioning topic may need to be created in Kafka if a later operatorSerde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 *
	 * depends@param onrepartitioned the newly selected key.{@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 *                  {@link StreamsConfigStreamPartitioner} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG} which determines how records are distributed among partitions of the topic,
	 *  <name>      is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an internally
	 * generated name, and "-repartition" ispart aof fixed suffix.
	 * Ifthe topic name and number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}for a repartition topic, if repartitioning is required.
	 * repartition@return topica will{@code beKStream} generatedthat withcontains the specified number of partitions.exact same (and potentially repartitioned) records as this {@code KStream}
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>@see #repartition()
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

	/**
	 * YouMaterialize canthis retrievestream allto a auto-generated internalrepartition topic names viaand creates a new {@link@code Topology#describe()KStream}.
	 * <p>
	from *the Forauto-generated thistopic case,using all{@link dataSerde ofkey this stream will be redistributed through the repartitioning topic by writing allserde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * recordsnumber toof it,partitions and rereadingtopic all records from it, such that the resultingname part as defined by {@link Repartitioned}.
	 * The provided {@link KGroupedStreamKeyValueMapper} is partitioned
	 * correctly on its keyapplied to each input record and computes a new key for it.
	 *
	 *Thus, @paraman repartitionedinput therecord {@link@code Repartitioned<K,V>} instance used to specify {@link org. can be transformed into an output record {@code <K':V>}.
	 *
	 * @param mapper        a {@link KeyValueMapper} that computes a new key for each record
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} usedwhich todetermines determine how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@link@code KGroupedStreamKStream} that contains records with thenew groupedkey records(possibly of the original {@code KStream}different type) and unmodified value
	 * @see #groupBy(KeyValueMapper, #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KGroupedStream<KRKStream<KR, V> groupByKeyrepartition(final Repartitioned<KR, V> repartitioned);

	/**
	 * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
	 * operator depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an
	 * internally generated name.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
	 * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
	 * <p>
	 * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);
}

In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:

...

languagejava
titleKStream.java
KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
	                                final Repartitioned<KR, V> repartitioned);
}


Proposed Changes

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.

Rejected Alternatives

Repartition "hint" in groupBy operations

In the mailing thread discussion, concern was raised that adding number of partitions configuration option to group by operations, such as `KStream#groupByKey(Repartitioned)` may not be the best option. Main argument against it is that whenever user specified number of partitions for internal, repartition topics, he/she really cares that those configuration will be applied. Case with group by is that, repartitioning will not happen at all if key changing operation isn't performed, therefore number of partitions configuration specified by the user will never kick-in. Alternatively, if user cares about manual repartitioning, one may do following in order to scale up/down sub topologies:

Code Block
languagejava
builder.stream("topic")
       .repartition((key, value) -> value.newKey(), Repartitioned.withNumberOfPartitions(5))
	   .groupByKey()
	   .count();


Rejected KSream interface changes:

...

Code Block
languagejava
titleKTableKStream.java
public interface KTable<KKStream<K, V> {
	/**
	 * Re-groups the records of this {@code KTable} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedTable}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
     * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)}
     * or generated internally, and "-repartition" is a fixed suffix.
     * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
     * repartition topic will be generated with the specified number of partitions.
     * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 *
	 * <p>
	 * All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
	 * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
	 * on the new key.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link org.apache.kafka.streams.processor.StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);

	/**
	 * Re-groups the records by their current key into a {@link KGroupedStream} while preserving the original values
 original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	         * and using the serializers as defined by {@link Repartitioned}.
	 * If the new record key is {@code null} the record will not be included in thefinal resultingRepartitioned<KR, {@link KGroupedStream}.V> repartitioned);

	 /** <p>
	 * An internal repartitioning topic may be created in Kafka, if number of partitions provided via
     * {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)} is different compared to source topic of this {@link KTable},Re-groups the records by their current key into a {@link KGroupedStream} while preserving the original values
     * and using the serializers as defined by {@link Repartitioned}.
	 * If the repartitionednew record topickey is created, it{@code null} the record will not be included named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>"
     * is either provided viain the resulting {@link KGroupedStream}.
	 * <p>
	 * An internal repartitioning topic may be created in Kafka, if number of partitions provided via
     * {@link org.apache.kafka.streams.kstream.Repartitioned#asRepartitioned#withNumberOfPartitions(String)} or generated internally, and "-repartition" is a fixed suffix.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                     int)} is different compared to source topic of this {@link KTable},
	 * If repartitioned topic is created, it will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>"
     * is either provided via {@link org.apache.kafka.streams.processor.StreamPartitionerkstream.Repartitioned#as(String)} whichor determinesgenerated howinternally, records are distributed among partitions of the topic,and "-repartition" is a fixed suffix.
	 * <p>
	 * You can retrieve all generated internal topic names via             part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KTable}
	 */
	KGroupedStream<K, V> groupBy(final Repartitioned<K, V> repartitioned);
}

...

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.
  • For KStream#groupBy(Repartitioned) repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified Repartitioned#numberOfPartitiones; otherwise use the upstream topic partition size as the new topic number of partitions.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.

...

{@link Topology#describe()}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link org.apache.kafka.streams.processor.StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KTable}
	 */
	KGroupedStream<K, V> groupBy(final Repartitioned<K, V> repartitioned);
}

Why not use Produced operation for specifying number of partitions?

...