You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current stateUnder Discussion

Discussion thread: tba

JIRA:  KAFKA-6037

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

  1. The main motivation of this KIP is stated in the related jira: "Today the downstream sub-topology's parallelism (aka the number of tasks) are purely dependent on the upstream sub-topology's parallelism, which ultimately depends on the source topic's num. partitions. However this does not work perfectly with dynamic scaling scenarios". 
  2. Depending on use-case, we might not need to use repartitioning topic, as keys might not change. 

 

Proposed Changes

 

public class RepartitionHint {

    public final String topicName;
    public final Integer topicPartitions;
    public final Boolean repartitionRequired;

    public RepartitionHint (String topicName) {
        this.topicName = topicName;
        this.topicPartitions = null;
        this.repartitionRequired = true;
    }

    public RepartitionHint (String topicName, Integer topicPartitions) {
        this.topicName = topicName;
        this.topicPartitions = topicPartitions;
        this.repartitionRequired = true;
    }

    public RepartitionHint (Boolean repartitionRequired) {
        this.topicName = null;
        this.topicPartitions = null;
        this.repartitionRequired = repartitionRequired;
    }
    
    public RepartitionHint (String topicName, Integer topicPartitions, Boolean repartitionRequired) {
        this.topicName = topicName;
        this.topicPartitions = topicPartitions;
        this.repartitionRequired = repartitionRequired;
    }

    public String getTopicName() {
        return topicName;
    }

    public Integer getTopicPartitions() {
        return topicPartitions;
    }
}

 

Proposed Changes

 

We try to avoid many overloads by not implementing all the overloaded combinations in DSL.

 

KStream

 

public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other,
                                    final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                    final Joined<K, V, VT> joined,
									final RepartitionHint hintThisStream)
public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
                                        final ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
                                        final Joined<K, V, VT> joined, 
										final RepartitionHint hintThisStream)




public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                                    final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                    final JoinWindows windows,
                                    final Joined<K, V, VO> joined, 
									final RepartitionHint hintThisStream,
									final RepartitionHint hintOtherStream)
 
public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
                                  final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                  final JoinWindows windows,
                                  final Serde<K> keySerde,
                                  final Serde<V> thisValueSerde,
                                  final Serde<V1> otherValueSerde, 
								  final RepartitionHint hintThisStream,
								  final RepartitionHint hintOtherStream)

public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other,
                                         final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                         final JoinWindows windows,
                                         final Joined<K, V, VO> joined,
										 final RepartitionHint hintThisStream,
									     final RepartitionHint hintOtherStream)
 
public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
                                      final ValueJoiner<? super V, ? super V1, ? extends R> joiner,
                                      final JoinWindows windows,
                                      final Serde<K> keySerde,
                                      final Serde<V> thisValSerde,
                                      final Serde<V1> otherValueSerde, 
									  final RepartitionHint hintThisStream,
									  final RepartitionHint hintOtherStream)


public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other,
                                        final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                        final JoinWindows windows,
                                        final Joined<K, V, VO> joined, 
										final RepartitionHint hintThisStream,
										final RepartitionHint hintOtherStream)

 

 

 

KGroupedStream

 

public <T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                            final Aggregator<? super K, ? super V, T> aggregator,
                                            final Merger<? super K, T> sessionMerger,
                                            final SessionWindows sessionWindows,
                                            final Serde<T> aggValueSerde,
                                            final org.apache.kafka.streams.processor.StateStoreSupplier<SessionStore> storeSupplier,
											final RepartitionHint repartitionHint)


 
public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                  final Aggregator<? super K, ? super V, T> aggregator,
                                  final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
								  final RepartitionHint repartitionHint)




 
public <W extends Window, T> KTable<Windowed<K>, T> aggregate(final Initializer<T> initializer,
                                                              final Aggregator<? super K, ? super V, T> aggregator,
                                                              final Windows<W> windows,
                                                              final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier,
															  final RepartitionHint repartitionHint)


 
public KTable<K, V> reduce(final Reducer<V> reducer,
                           final org.apache.kafka.streams.processor.StateStoreSupplier<KeyValueStore> storeSupplier,
						   final RepartitionHint repartitionHint)




 
public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
                                                        final Windows<W> windows,
                                                        final org.apache.kafka.streams.processor.StateStoreSupplier<WindowStore> storeSupplier,
														final RepartitionHint repartitionHint)




 

 

 

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels