...
Code Block | ||||
---|---|---|---|---|
| ||||
public/** * This class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> { protected final String name; protected final Serde<K> keySerde; protected final Serde<V> valueSerde; protected final Integer numOfPartitions; protected final StreamPartitioner<? super K, ? super V> partitioner; private Repartitioned(String name, Serde<K> keySerde, is used to provide the optional parameters for internal repartitioned topics when using * using: * - {@link KStream#repartition(Repartitioned)} * - {@link KStream#repartition(KeyValueMapper, Repartitioned)} * - {@link KStream#groupByKey(Repartitioned)} * - {@link KStream#groupBy(KeyValueMapper, Repartitioned)} * * @param <K> key type * @param <V> value type */ public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> { protected final String name; protected final Serde<K> keySerde; protected final Serde<V> valueSerde; protected final Integer numberOfPartitions; protected final StreamPartitioner<? super K, ? super V> partitioner; private Repartitioned(String name, Serde<K> keySerde, Serde<V> valueSerde, Integer numOfPartitionsnumberOfPartitions, StreamPartitioner<? super K, ? super V> partitioner) { this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; this.numOfPartitionsnumberOfPartitions = numOfPartitionsnumberOfPartitions; this.partitioner = partitioner; } public static/** * Create a {@link Repartitioned as(final String name) { return new Repartitioned<>(name, null, null, null, null); } @Override public Repartitioned<K, V> withName(final String name) { return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner); } public Repartitioned<K, V> withNumOfPartitions(final int numOfPartitions) { return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner); } public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) { return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner); } public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) { return new Repartitioned<>(name, keySerde, valueSerde, numOfPartitions, partitioner); } public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner} instance with the provided name used as part of the repartition topic if required. * * @param name the name used as a processor named and part of the repartition topic name if required. * @param <K> key type * @param <V> value type * @return A new {@link Repartitioned} instance configured with processor name and repartition topic name * @see KStream#repartition(Repartitioned) * @see KStream#repartition(KeyValueMapper, Repartitioned) * @see KStream#groupByKey(Repartitioned) * @see KStream#groupBy(KeyValueMapper, Repartitioned) */ public static <K, V> Repartitioned<K, V> as(final String name) { return new Repartitioned<>(name, keySerdenull, valueSerdenull, numOfPartitionsnull, partitionernull); } } |
New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.
Code Block | ||
---|---|---|
| ||
public interface KStream<K, V> { KGroupedStream<K, V> groupByKey(final Repartitioned<K, V> repartitioned); <KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, /** * Create a {@link Repartitioned} instance with provided key serde and value serde. * * @param keySerde Serde to use for serializing the key * @param valueSerde Serde to use for serializing the value * @param <K> key type * @param <V> value final Repartitioned<KR, V> repartitioned); } |
In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:
type
* @return A new {@link Repartitioned} instance configured with key serde and value serde
* @see KStream#repartition(Repartitioned)
* @see KStream#repartition(KeyValueMapper, Repartitioned)
* @see KStream#groupByKey(Repartitioned)
* @see KStream#groupBy(KeyValueMapper, Repartitioned)
*/
public static <K, V> Repartitioned<K, V> with(final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new Repartitioned<>(null, keySerde, valueSerde, null, null);
}
/**
* Create a {@link Repartitioned} instance with provided name and number of partitions used
* as part of the repartition topic configuration if required.
*
* @param <K> key type
* @param <V> value type
* @param name the name used as part of the repartition topic name if required
* @param numberOfPartitions number of partitions used for creating repartition topic if required
* @return A new instance of {@link Repartitioned} with configured name and number of partitions
* @see KStream#repartition(Repartitioned)
* @see KStream#repartition(KeyValueMapper, Repartitioned)
* @see KStream#groupByKey(Repartitioned)
* @see KStream#groupBy(KeyValueMapper, Repartitioned)
*/
public static <K, V> Repartitioned<K, V> with(final String name,
final int numberOfPartitions) {
return new Repartitioned<>(name, null, null, numberOfPartitions, null);
}
/**
* Create a {@link Repartitioned} instance with provided partitioner.
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified and the key serde provides a {@link WindowedSerializer} for the key
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used
* @param <K> key type
* @param <V> value type
* @return A new {@link Repartitioned} instance configured with partitioner
* @see KStream#repartition(Repartitioned)
* @see KStream#repartition(KeyValueMapper, Repartitioned)
* @see KStream#groupByKey(Repartitioned)
* @see KStream#groupBy(KeyValueMapper, Repartitioned)
*/
public static <K, V> Repartitioned<K, V> streamPartitioner(StreamPartitioner<? super K, ? super V> partitioner) {
return new Repartitioned<>(null, null, null, null, partitioner);
}
/**
* Create a {@link Repartitioned} instance with provided number of partitions for repartition topic if required.
*
* @param numberOfPartitions number of partitions used when creating repartition topic if required
* @param <K> key type
* @param <V> value type
* @return A new {@link Repartitioned} instance configured number of partitions
* @see KStream#repartition(Repartitioned)
* @see KStream#repartition(KeyValueMapper, Repartitioned)
* @see KStream#groupByKey(Repartitioned)
* @see KStream#groupBy(KeyValueMapper, Repartitioned)
*/
public static <K, V> Repartitioned<K, V> numberOfPartitions(int numberOfPartitions) {
return new Repartitioned<>(null, null, null, numberOfPartitions, null);
}
/**
* Create a new instance of {@link Repartitioned} with the provided name used as part of repartition topic and processor name.
* Note that Kafka Streams creates repartition topic only if required.
*
* @param name the name used for the processor name and as part of the repartition topic name if required
* @return a new {@link Repartitioned} instance configured with the name
*/
@Override
public Repartitioned<K, V> withName(final String name) {
return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
}
/**
* Create a new instance of {@link Repartitioned} with the provided number of partitions for repartition topic.
* Note that Kafka Streams creates repartition topic only if required.
*
* @param numberOfPartitions the name used for the processor name and as part of the repartition topic name if required
* @return a new {@link Repartitioned} instance configured with the number of partitions
*/
public Repartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) {
return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
}
/**
* Create a new instance of {@link Repartitioned} with the provided key serde.
*
* @param keySerde Serde to use for serializing the key
* @return a new {@link Repartitioned} instance configured with the key serde
*/
public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
}
/**
* Create a new instance of {@link Repartitioned} with the provided value serde.
*
* @param valueSerde Serde to use for serializing the value
* @return a new {@link Repartitioned} instance configured with the value serde
*/
public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
}
/**
* Create a new instance of {@link Repartitioned} with the provided partitioner.
*
* @param partitioner the function used to determine how records are distributed among partitions of the topic,
* if not specified and the key serde provides a {@link WindowedSerializer} for the key
* {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
* @return a new {@link Repartitioned} instance configured with provided partitioner
*/
public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
}
} |
New KStream#groupBy shall be introduced in order to give the user control over parallelism for sub-topologies.
Code Block | ||
---|---|---|
| ||
public interface KStream<K, V> {
/**
* Group the records by their current key into a {@link KGroupedStream} while preserving the original values
* and using the serializers as defined by {@link Grouped}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
* {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
* {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
* {@link #through(String)}, {@link #repartition)}) an internal repartitioning topic may need to be created in Kafka if a later operator
* depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* <name> is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an internally
* generated name, and "-repartition" is a fixed suffix.
* If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
* repartition topic will be generated with the specified number of partitions.
* If not, number of partitions will be inherited from the source topic.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
* records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
* correctly on its key.
*
* @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
* {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
* part of the name and number of partitions for a repartition topic if repartitioning is required.
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
* @see #groupBy(KeyValueMapper, Repartitioned)
*/
<KR> KGroupedStream<KR, V> groupByKey(final Repartitioned<KR, V> repartitioned);
/**
* Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
* and {@link Serde}s as specified by {@link Repartitioned}.
* Grouping a stream on the record key is required before an aggregation operator can be applied to the data
* (cf. {@link KGroupedStream}).
* The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
* original values.
* If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
* <p>
* Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
* operator depends on the newly selected key.
* This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
* {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
* "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an
* internally generated name.
* If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
* repartition topic will be generated with the specified number of partitions.
* If not, number of partitions will be inherited from the source topic.
* <p>
* You can retrieve all generated internal topic names via {@link Topology#describe()}.
* <p>
* All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
* and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
* <p>
* This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
*
* @param selector a {@link KeyValueMapper} that computes a new key for grouping
* @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
* {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
* part of the name and number of partitions for a repartition topic if repartitioning is required.
* @param <KR> the key type of the result {@link KGroupedStream}
* @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
*/
<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
final Repartitioned<KR, V> repartitioned);
} |
In addition, in order to give users control over when to create repartition topic (KAFKA-8611) following new operators shall be introduced to KStream interface:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface KStream<K, V> {
/**
* Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
* from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}.
* Number of partitions is inherited from the source topic.
*
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @see #repartition(Repartitioned)
* @see #repartition(KeyValueMapper, Repartitioned)
*/
KStream<K, V> repartition();
/**
* Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
* from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
* number of partitions and topic name part as defined by {@link Repartitioned}.
*
* @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
* {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
* part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
* @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
* @see #repartition()
* @see #repartition(Repartitioned)
*/
<KR> KStream<KR, V> repartition(Repartitioned<KR, V> repartitioned);
/**
* Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
* from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
* number of partitions and topic name part as defined by {@link Repartitioned}.
* The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
* Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
*
* @param mapper a {@link KeyValueMapper} that computes a new key for each record
* @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
* {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
* part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
* @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
* @see #repartition()
* @see #repartition(Repartitioned) | ||||
Code Block | ||||
| ||||
public interface KStream<K, V> { // .. // /** * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration will be inherited from source topic. */ KStream<K, V> repartition(); /** * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration will be inherited from source topic. */ <KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, KR> selector); /** * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration is applied according to {@param repartitioned} specification. */ KStream<K, V> repartition(final Repartitioned<K, V> repartitioned); /** * Kafka Streams creates and manages repartition topic. Partitioner and num of partitions configuration is applied according to {@param repartitioned} specification. */ <KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> selectormapper, final Repartitioned<KR, V> repartitioned); // .. // } |
Proposed Changes
For KStream#repartition
...
Why not use Produced operation for specifying number of partitions?
There're multiple two main reasons why not to use Produced class for specifying number of partitions.
...