You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 39 Next »

 

Status

Current stateUnder Discussion

Discussion threadhttps://www.mail-archive.com/dev@kafka.apache.org/msg99111.html

JIRA:   Unable to render Jira issues macro, execution error. Unable to render Jira issues macro, execution error.   Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios. By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, at this point, when using DSL in Kafka Streams, data re-partition happens only when key-changing operation is followed by stateful operation. On the other hand, in DSL, stateful computation can happen using transform() operation as well. Problem with this approach is that, even if any upstream operation was key-changing before calling transform(), no auto-repartition is triggered. If repartitioning is required, a call to through(String) should be performed before transform(). With the current implementation, burden of managing and creating the topic falls on user and introduces extra complexity of managing Kafka Streams application. 

Public interfaces

With current API contract in Kafka Streams DSL we have possibility to control and specify producer level configuration using Produced<K, V> class, which we can pass to various operators like KStream#to, KStream#through etc. However, in current Kafka Streams DSL, there's no possibility to specify topic level configurations for different operators that potentially may create internal topics. In order to give the user possibility to control parallelism of sub-topologies and potentially make internal topic configuration more flexible, we shall add following configuration class.

Repartitioned.java
public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected final String name;

	protected final Serde<K> keySerde;

	protected final Serde<V> valueSerde;

	protected final Integer numOfPartitions;

	protected final StreamPartitioner<? super K, ? super V> partitioner;

	private Repartitioned(String name,
	                      Serde<K> keySerde,
	                      Serde<V> valueSerde,
	                      Integer numOfPartitions,
	                      StreamPartitioner<? super K, ? super V> partitioner) {
		this.name = name;
		this.keySerde = keySerde;
		this.valueSerde = valueSerde;
		this.numOfPartitions = numOfPartitions;
		this.partitioner = partitioner;
	}

	public static Repartitioned as(final String name) {
		return new Repartitioned<>(name, null, null, null, null);
	}

	@Override
	public Repartitioned<K, V> withName(final String name) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withNumOfPartitions(final int numOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}

	public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner);
	}
}

New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.

public interface KStream<K, V> {

	KGroupedStream<K, V> groupByKey(final Repartitioned<K, V> repartitioned);

	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);
}


In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:

KStream.java
public interface KStream<K, V> {

	// .. //
	
	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration will be inherited from source topic.
	 */
	KStream<K, V> repartition();
	
	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration will be inherited from source topic.
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, KR> selector);

	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration is applied according to {@param repartitioned} specification.
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

	/**
	 * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration is applied according to {@param repartitioned} specification.
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                final Repartitioned<KR, V> repartitioned);
	// .. //
}


Proposed Changes

For KStream#repartition

  • when Repartitioned operation is specified, Kafka Streams application will first issue the topic lookup request and check whether the target topic is already up and running. If Repartitioned is configured with number of partitions, in addition, Kafka Streams application will make sure that number of partitions in the topic match with the configured value. If not, application will thrown an error and fail during startup.
  • If Repartitioned operation is not specified, use upstream topic partition size as the new topic number of partitions. Topic name will be generated based on the generated processor node name.

For KStream#groupBy

  • repartition topic gets a hint on how many partitions it should be created with. If repartition topic is not created yet, create one with specified numPartitions; otherwise use the upstream topic partition size as the new topic number of partitions.

Backward Compatibility

This is a pure KStream library change that shouldn't affect previously setup applications. Since we introduce new KStream#groupBy operations, existing ones shouldn't be affected by this change.

Rejected Alternatives

Why not use Produced operation for specifying number of partitions?

There're multiple reasons why not to use Produced class for specifying number of partitions.

1) If we enhance Produced class with this configuration, this will also affect KStream#to. Since KStream#to is the final sink of the topology, it seems to be reasonable assumption that user needs to manually create sink topic in advance. And in that case, having num of partitions configuration doesn’t make much sense.

2) Looking at KStream interface, seems like Produced class is for operations that work with non-internal (e.g topics that are created and managed internally by Kafka Streams) topics and this contract will break if we gonna enhance it with number of partitions configuration. Current API contract is very clear in that respect - every operation (KStream#to, KStream#through) that use Produced class work with topics that are explicitly managed by the end user.


  • No labels