Status
Current state: Under Discussion
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg99111.html
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios. By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, at this point, when using DSL in Kafka Streams, data re-partition happens only when key-changing operation is followed by stateful operation. On the other hand, in DSL, stateful computation can happen using transform() operation as well. Problem with this approach is that, even if any upstream operation was key-changing before calling transform(), no auto-repartition is triggered. If repartitioning is required, a call to through(String) should be performed before transform(). With the current implementation, burden of managing and creating the topic falls on user and introduces extra complexity of managing Kafka Streams application.
Public interfaces
With current API contract in Kafka Streams DSL we have possibility to control and specify producer level configuration using Produced<K, V> class, which we can pass to various operators like KStream#to, KStream#through etc. However, in current Kafka Streams DSL, there's no possibility to specify topic level configurations for different operators that potentially may create internal topics. In order to give the user possibility to control parallelism of sub-topologies and potentially make internal topic configuration more flexible, we shall add following configuration class.
/** * This class is used to provide the optional parameters for internal repartitioned topics when using: * - {@link KStream#repartition(Repartitioned)} * - {@link KStream#repartition(KeyValueMapper, Repartitioned)} * - {@link KStream#groupByKey(Repartitioned)} * - {@link KStream#groupBy(KeyValueMapper, Repartitioned)} * * @param <K> key type * @param <V> value type */ public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> { protected final String name; protected final Serde<K> keySerde; protected final Serde<V> valueSerde; protected final Integer numberOfPartitions; protected final StreamPartitioner<? super K, ? super V> partitioner; private Repartitioned(String name, Serde<K> keySerde, Serde<V> valueSerde, Integer numberOfPartitions, StreamPartitioner<? super K, ? super V> partitioner) { this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; this.numberOfPartitions = numberOfPartitions; this.partitioner = partitioner; } /** * Create a {@link Repartitioned} instance with the provided name used as part of the repartition topic if required. * * @param name the name used as a processor named and part of the repartition topic name if required. * @param <K> key type * @param <V> value type * @return A new {@link Repartitioned} instance configured with processor name and repartition topic name * @see KStream#repartition(Repartitioned) * @see KStream#repartition(KeyValueMapper, Repartitioned) * @see KStream#groupByKey(Repartitioned) * @see KStream#groupBy(KeyValueMapper, Repartitioned) */ public static <K, V> Repartitioned<K, V> as(final String name) { return new Repartitioned<>(name, null, null, null, null); } /** * Create a {@link Repartitioned} instance with provided key serde and value serde. * * @param keySerde Serde to use for serializing the key * @param valueSerde Serde to use for serializing the value * @param <K> key type * @param <V> value type * @return A new {@link Repartitioned} instance configured with key serde and value serde * @see KStream#repartition(Repartitioned) * @see KStream#repartition(KeyValueMapper, Repartitioned) * @see KStream#groupByKey(Repartitioned) * @see KStream#groupBy(KeyValueMapper, Repartitioned) */ public static <K, V> Repartitioned<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) { return new Repartitioned<>(null, keySerde, valueSerde, null, null); } /** * Create a {@link Repartitioned} instance with provided name and number of partitions used * as part of the repartition topic configuration if required. * * @param <K> key type * @param <V> value type * @param name the name used as part of the repartition topic name if required * @param numberOfPartitions number of partitions used for creating repartition topic if required * @return A new instance of {@link Repartitioned} with configured name and number of partitions * @see KStream#repartition(Repartitioned) * @see KStream#repartition(KeyValueMapper, Repartitioned) * @see KStream#groupByKey(Repartitioned) * @see KStream#groupBy(KeyValueMapper, Repartitioned) */ public static <K, V> Repartitioned<K, V> with(final String name, final int numberOfPartitions) { return new Repartitioned<>(name, null, null, numberOfPartitions, null); } /** * Create a {@link Repartitioned} instance with provided partitioner. * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified and the key serde provides a {@link WindowedSerializer} for the key * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used * @param <K> key type * @param <V> value type * @return A new {@link Repartitioned} instance configured with partitioner * @see KStream#repartition(Repartitioned) * @see KStream#repartition(KeyValueMapper, Repartitioned) * @see KStream#groupByKey(Repartitioned) * @see KStream#groupBy(KeyValueMapper, Repartitioned) */ public static <K, V> Repartitioned<K, V> streamPartitioner(StreamPartitioner<? super K, ? super V> partitioner) { return new Repartitioned<>(null, null, null, null, partitioner); } /** * Create a {@link Repartitioned} instance with provided number of partitions for repartition topic if required. * * @param numberOfPartitions number of partitions used when creating repartition topic if required * @param <K> key type * @param <V> value type * @return A new {@link Repartitioned} instance configured number of partitions * @see KStream#repartition(Repartitioned) * @see KStream#repartition(KeyValueMapper, Repartitioned) * @see KStream#groupByKey(Repartitioned) * @see KStream#groupBy(KeyValueMapper, Repartitioned) */ public static <K, V> Repartitioned<K, V> numberOfPartitions(int numberOfPartitions) { return new Repartitioned<>(null, null, null, numberOfPartitions, null); } /** * Create a new instance of {@link Repartitioned} with the provided name used as part of repartition topic and processor name. * Note that Kafka Streams creates repartition topic only if required. * * @param name the name used for the processor name and as part of the repartition topic name if required * @return a new {@link Repartitioned} instance configured with the name */ @Override public Repartitioned<K, V> withName(final String name) { return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner); } /** * Create a new instance of {@link Repartitioned} with the provided number of partitions for repartition topic. * Note that Kafka Streams creates repartition topic only if required. * * @param numberOfPartitions the name used for the processor name and as part of the repartition topic name if required * @return a new {@link Repartitioned} instance configured with the number of partitions */ public Repartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) { return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner); } /** * Create a new instance of {@link Repartitioned} with the provided key serde. * * @param keySerde Serde to use for serializing the key * @return a new {@link Repartitioned} instance configured with the key serde */ public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) { return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner); } /** * Create a new instance of {@link Repartitioned} with the provided value serde. * * @param valueSerde Serde to use for serializing the value * @return a new {@link Repartitioned} instance configured with the value serde */ public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) { return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner); } /** * Create a new instance of {@link Repartitioned} with the provided partitioner. * * @param partitioner the function used to determine how records are distributed among partitions of the topic, * if not specified and the key serde provides a {@link WindowedSerializer} for the key * {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used * @return a new {@link Repartitioned} instance configured with provided partitioner */ public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) { return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner); } }
New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.
public interface KStream<K, V> { /** * Group the records by their current key into a {@link KGroupedStream} while preserving the original values * and using the serializers as defined by {@link Repartitioned}. * Grouping a stream on the record key is required before an aggregation operator can be applied to the data * (cf. {@link KGroupedStream}). * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * <p> * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)}, * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via * {@link #through(String)}, {@link #repartition)}) an internal repartitioning topic may need to be created in Kafka if a later operator * depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * <name> is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an internally * generated name, and "-repartition" is a fixed suffix. * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)} * repartition topic will be generated with the specified number of partitions. * If not, number of partitions will be inherited from the source topic. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned * correctly on its key. * * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}, * {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic, * part of the name and number of partitions for a repartition topic if repartitioning is required. * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} * @see #groupBy(KeyValueMapper, Repartitioned) */ <KR> KGroupedStream<KR, V> groupByKey(final Repartitioned<KR, V> repartitioned); /** * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper} * and {@link Serde}s as specified by {@link Repartitioned}. * Grouping a stream on the record key is required before an aggregation operator can be applied to the data * (cf. {@link KGroupedStream}). * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the * original values. * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}. * <p> * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later * operator depends on the newly selected key. * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an * internally generated name. * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)} * repartition topic will be generated with the specified number of partitions. * If not, number of partitions will be inherited from the source topic. * <p> * You can retrieve all generated internal topic names via {@link Topology#describe()}. * <p> * All data of this stream will be redistributed through the repartitioning topic by writing all records to it, * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key. * <p> * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}. * * @param selector a {@link KeyValueMapper} that computes a new key for grouping * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}, * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, * part of the topic name and number of partitions for a repartition topic, if repartitioning is required. * @param <KR> the key type of the result {@link KGroupedStream} * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream} */ <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, final Repartitioned<KR, V> repartitioned); }
In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:
public interface KStream<K, V> { /** * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream} * from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}. * Number of partitions is inherited from the source topic. * * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} * @see #repartition(Repartitioned) * @see #repartition(KeyValueMapper, Repartitioned) */ KStream<K, V> repartition(); /** * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream} * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, * number of partitions and topic name part as defined by {@link Repartitioned}. * * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}, * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, * part of the topic name and number of partitions for a repartition topic, if repartitioning is required. * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream} * @see #repartition() * @see #repartition(Repartitioned) */ KStream<K, V> repartition(Repartitioned<K, V> repartitioned); /** * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream} * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner}, * number of partitions and topic name part as defined by {@link Repartitioned}. * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it. * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}. * * @param mapper a {@link KeyValueMapper} that computes a new key for each record * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes}, * {@link StreamPartitioner} which determines how records are distributed among partitions of the topic, * part of the topic name and number of partitions for a repartition topic, if repartitioning is required. * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value * @see #repartition() * @see #repartition(Repartitioned) */ <KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper, Repartitioned<KR, V> repartitioned); }
Proposed Changes
- For KStream#repartition(Repartitioned) operation, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
- For KStream#repartition() operation, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.
- For KStream#groupBy(Repartitioned) repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified Repartitioned#numberOfPartitiones; otherwise use the upstream topic partition size as the new topic number of partitions.
Backward Compatibility
This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.
Rejected Alternatives
Why not use Produced operation for specifying number of partitions?
There're two main reasons why not to use Produced class for specifying number of partitions.
1) If we enhance Produced class with this configuration, this will also affect KStream#to. Since KStream#to is the final sink of the topology, it seems to be reasonable assumption that user needs to manually create sink topic in advance. And in that case, having num of partitions configuration doesn’t make much sense.
2) Looking at KStream interface, seems like Produced class is for operations that work with non-internal (e.g topics that are created and managed internally by Kafka Streams) topics and this contract will break if we gonna enhance it with number of partitions configuration. Current API contract is very clear in that respect - every operation (KStream#to, KStream#through) that use Produced class work with topics that are explicitly managed by the end user.