Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The main motivation of this KIP is stated in the related JIRA: "Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios". By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, for API like #to or #through, KStream has no access to sink topic creation, which means user has to create their own connecting Kafka topic for every single new application which is cumbersome. Thus we are proposing extending the capability of #Produced and #Grouped API to automatically create connecting topics through KStream application when the target topic has not created.

Public interfaces

, at this point, when using DSL in Kafka Streams, data re-partition happens only when key-changing operation is followed by stateful operation. On the other hand, in DSL, stateful computation can happen using transform() operation as well. Problem with this approach is that, even if any upstream operation was key-changing before calling transform(), no auto-repartition is triggered. If repartitioning is required, a call to through(String) should be performed before transform(). With the current implementation, burden of managing and creating the topic falls on user and introduces extra complexity of managing Kafka Streams application. 

Public interfaces

With current API contract in Kafka Streams DSL we have possibility to control and specify producer level configuration using Produced<K, V> class, which we can pass to various operators like KStream#to, KStream#through etc. However, in current Kafka Streams DSL, there's no possibility to specify topic level configurations for different operators that potentially may create internal topics. In order to give the user possibility to control parallelism of sub-topologies and potentially make internal topic configuration more flexible, we shall add following configuration class.We shall expand the Produced API to contain numPartitions: 

Code Block
languagejava
titleProducedRepartitioned.java
public class Produced<KRepartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected Serde<K>final String keySerdename;

	protected Serde<V>final Serde<K> valueSerdekeySerde;

	protected StreamPartitioner<?final super K, ? super V> partitionerSerde<V> valueSerde;

	protected final Integer numPartitionsnumOfPartitions; // new

	publicprivate staticRepartitioned(String <Kname,
	 V> Produced<K, V> with(final Serde<K> keySerde,
                     Serde<K> keySerde,
	                    	 final Serde<V> valueSerde,
	                      Integer numOfPartitions) {
		this.name = name;
		this.keySerde = keySerde;
		this.valueSerde = valueSerde;
		this.numOfPartitions = numOfPartitions;
	}

	public static Repartitioned as(final String name) {
		return new Repartitioned<>(name, null, null, null);
	}

	@Override
	public Repartitioned<K, V> withName(final StreamPartitioner<?String super K, ? super V> partitioner,
										     final Integer numPartitions);
}

Also expand Grouped API with a numPartitions configuration:

name) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions);
	}

	public Repartitioned<K, V> withNumOfPartitions(final int numOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions);
	}

	public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions);
	}

	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions);
	}
}


New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.

Code Block
languagejava
public interface KStream<K
Code Block
languagejava
titleGrouped.java
public class Grouped<K, V> {
 	protected final Serde<K> keySerde;
 	protected final Serde<V> valueSerde;
 	protected final String name;
	protected final Integer numPartitions; // new

	public static <K, V> Grouped<K, V> with(final String name,
    	
	KGroupedStream<K, V> groupByKey(final Repartitioned<K, V> repartitioned);

	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);
}


In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	// .. //
	KStream<K, V> repartition();

	KStream<K, V> repartition(final Serde<K> keySerde,
         Repartitioned<K, V> repartitioned);

	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                final Serde<V> valueSerde,
											final Integer numPartitions);Repartitioned<KR, V> repartitioned);
	// .. //
}


Proposed Changes

...

For KStream#repartition

  • when Repartitioned operation is specified, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If

...

  • Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • If Repartitioned operation is not specified, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.

For KStream#groupBy

  • repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified numPartitions; otherwise use the upstream topic partition size as the new topic number of partitions.

...

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. If numPartitions is not configured by user, it will use a default value -1 which won't affect existing topology initialization logic.

Rejected Alternatives

Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.

Rejected Alternatives

Why not use Produced operation for specifying number of partitions?

There're multiple reasons why not to use Produced class for specifying number of partitions.

1) If we enhance Produced class with this configuration, this will also affect KStream#to. Since KStream#to is the final sink of the topology, it seems to be reasonable assumption that user needs to manually create sink topic in advance. And in that case, having num of partitions configuration doesn’t make much sense.

2) Looking at Produced class, based on API contract, seems like Produced is designed to be something that is explicitly for producer (key serializer, value serializer, partitioner those all are producer specific configurations) and num of partitions is topic level configuration. And mixing topic and producer level configurations together in one class doesn't look semantically correct.

3) Looking at KStream interface, seems like Produced class is for operations that work with non-internal (e.g topics created and managed internally by Kafka Streams) topics and this contract will break if we gonna enhance it with number of partitions configuration.N/A