Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The main motivation of this KIP is stated in the related jiraJIRA: "Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num.partitions. However this does not work perfectly with dynamic scaling scenarios". By delegating the stream topology power to create repartition topic with customized number of partitions gives user more flexibility. Also, for API like #to or #through, KStream has no access to sink topic creation, which means user has to create their own connecting Kafka topic for every single new application which is cumbersome. Thus we are proposing extending the capability of #Produced API to automatically create topics when the target topic has not created.

Proposed Changes

Public interfaces

We shall expand the Produced API to contain topicName and numPartitions: 

Code Block
languagejava
titleProduced.java
public class Produced<K, V> {
	protected Serde<K> keySerde;
	protected Serde<V> valueSerde;
	protected StreamPartitioner<? super K, ? super V> partitioner;
	protected String topicName; // new
 	protected Integer partitionsnumPartitions; // new

	protected Produced(String topicName, Boolean alreadyCreated) {

	public static <K, V> Produced<K, V> with(final Serde<K> keySerde,
          	this.topicName = topicName;
   	    this.alreadyCreated = alreadyCreated;
	}
 
	protected Produced(String topicName, Integer partitions) {
    	this.topicName = topicName;
   	    this.partitions = partitions;
	}

 
	public static <K, V> Produced<K, V> withTopic(String topicName, Boolean alreadyCreated) {
	 final Serde<V> valueSerde,
                   	return new Produced<>(topicName, alreadyCreated);
	}
 
	public static <K, V> Produced<K, V> withTopic(String topicName, Integer partitions) {
    	return new Produced<>(topicName, partitions);
	}

	public Produced<K, V> repartitionNeeded(Boolean repartitionNeeded) {
		this.repartitionNeeded = repartitionNeeded;
		return this;
	}
 
	public Produced<K, V> useAsInternalTopicOnly(Boolean useAsInternalTopicOnly) {
		this.useAsInternalTopicOnly = useAsInternalTopicOnly;
		return this;
	}
}

 

Public interfaces

KStream

 

Code Block
languagejava
 KStream<K, V> through(final Produced topicInfo);
 void to(final Produced topicInfo);

 

The change with Produced is not compatible with the definition of “The specified topic should be manually created before it is used”. However, since we reject

 

Rejected Alternatives

 final StreamPartitioner<? super K, ? super V> partitioner,
											 final String topicName,
										     final Integer numPartitions);
}

We also plan to 

Code Block
languagejava

 

Proposed Changes

 When configured with topic name and partition,  KStream application will first issue the topic lookup request and check whether the target topic is already up and running. 

Backward Compatibility 

Rejected Alternatives

N/AIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.