Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
 

Status

Current stateVoting

Voting threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg99680.html

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios. By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, at this point, when using DSL in Kafka Streams, data re-partition happens only when key-changing operation is followed by stateful operation. On the other hand, in DSL, stateful computation can happen using transform() operation as well. Problem with this approach is that, even if any upstream operation was key-changing before calling transform(), no auto-repartition is triggered. If repartitioning is required, a call to through(String) should be performed before transform(). With the current implementation, burden of managing and creating the topic falls on user and introduces extra complexity of managing Kafka Streams application. 

Public interfaces

With current API contract in Kafka Streams DSL we have possibility to control and specify producer level configuration using Produced<K, V> class, which we can pass to various operators like KStream#to, KStream#through etc. However, in current Kafka Streams DSL, there's no possibility to specify topic level configurations for different operators that potentially may create internal topics. In order to give the user possibility to control parallelism of sub-topologies and potentially make internal topic configuration more flexible, we shall add following configuration class.

...

Code Block
languagejava
titleKTable.java
public interface KTable<K, V> {
	/**
	 * Re-groups the records of this {@code KTable} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Re-grouping a {@code KTable} is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedTable}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
     * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)}
     * or generated internally, and "-repartition" is a fixed suffix.
     * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
     * repartition topic will be generated with the specified number of partitions.
     * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 *
	 * <p>
	 * All data of this {@code KTable} will be redistributed through the repartitioning topic by writing all update
	 * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
	 * on the new key.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link org.apache.kafka.streams.processor.StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);

	/**
	 * Re-groups the records by their current key into a {@link KGroupedStream} while preserving the original values
     * and using the serializers as defined by {@link Repartitioned}.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * An internal repartitioning topic may be created in Kafka, if number of partitions provided via
     * {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)} is different compared to source topic of this {@link KTable},
	 * If repartitioned topic is created, it will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "<name>"
     * is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or generated internally, and "-repartition" is a fixed suffix.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link org.apache.kafka.streams.processor.StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KTable}
	 */
	KGroupedStream<K, V> groupBy(final Repartitioned<K, V> repartitioned);
}


Proposed Changes

  • For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.
  • For KStream#groupBy(Repartitioned) repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified Repartitioned#numberOfPartitiones; otherwise use the upstream topic partition size as the new topic number of partitions.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.

Rejected Alternatives

Why not use Produced operation for specifying number of partitions?

There're two main reasons why not to use Produced class for specifying number of partitions.

...