Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleRepartitioned.java
/**
 * This class is used to provide the optional parameters for internal repartitioned topics when using:
 * - {@link KStream#repartition(Repartitioned)}
 * - {@link KStream#repartition(KeyValueMapper, Repartitioned)}
 * - {@link KStream#groupByKey(Repartitioned)}
 * - {@link KStream#groupBy(KeyValueMapper, Repartitioned)}
 *
 * @param <K> key type
 * @param <V> value type
 */
public class Repartitioned<K, V> implements NamedOperation<Repartitioned<K, V>> {

	protected final String name;

	protected final Serde<K> keySerde;

	protected final Serde<V> valueSerde;

	protected final Integer numberOfPartitions;

	protected final StreamPartitioner<? super K, ? super V> partitioner;

	private Repartitioned(String name,
	                      Serde<K> keySerde,
	                      Serde<V> valueSerde,
	                      Integer numberOfPartitions,
	                      StreamPartitioner<? super K, ? super V> partitioner) {
		this.name = name;
		this.keySerde = keySerde;
		this.valueSerde = valueSerde;
		this.numberOfPartitions = numberOfPartitions;
		this.partitioner = partitioner;
	}

	/**
	 * Create a {@link Repartitioned} instance with the provided name used as part of the repartition topic if required.
	 *
	 * @param name the name used as a processor named and part of the repartition topic name if required.
	 * @param <K>  key type
	 * @param <V>  value type
	 * @return A new {@link Repartitioned} instance configured with processor name and repartition topic name
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> as(final String name) {
		return new Repartitioned<>(name, null, null, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided key serde and value serde.
	 *
	 * @param keySerde   Serde to use for serializing the key
	 * @param valueSerde Serde to use for serializing the value
	 * @param <K>        key type
	 * @param <V>        value type
	 * @return A new {@link Repartitioned} instance configured with key serde and value serde
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> with(final Serde<K> keySerde,
	                                              final Serde<V> valueSerde) {
		return new Repartitioned<>(null, keySerde, valueSerde, null, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided name and number of partitions used
	 * as part of the repartition topic configuration if required.
	 *
	 * @param <K>                key type
	 * @param <V>                value type
	 * @param name               the name used as part of the repartition topic name if required
	 * @param numberOfPartitions number of partitions used for creating repartition topic if required
	 * @return A new instance of {@link Repartitioned} with configured name and number of partitions
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> with(final String name,
	                                              final int numberOfPartitions) {
		return new Repartitioned<>(name, null, null, numberOfPartitions, null);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided partitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} will be used
	 * @param <K>         key type
	 * @param <V>         value type
	 * @return A new {@link Repartitioned} instance configured with partitioner
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(null, null, null, null, partitioner);
	}

	/**
	 * Create a {@link Repartitioned} instance with provided number of partitions for repartition topic if required.
	 *
	 * @param numberOfPartitions number of partitions used when creating repartition topic if required
	 * @param <K>                key type
	 * @param <V>                value type
	 * @return A new {@link Repartitioned} instance configured number of partitions
	 * @see KStream#repartition(Repartitioned)
	 * @see KStream#repartition(KeyValueMapper, Repartitioned)
	 * @see KStream#groupByKey(Repartitioned)
	 * @see KStream#groupBy(KeyValueMapper, Repartitioned)
	 */
	public static <K, V> Repartitioned<K, V> numberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(null, null, null, numberOfPartitions, null);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided name used as part of repartition topic and processor name.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param name the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the name
	 */
	@Override
	public Repartitioned<K, V> withName(final String name) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided number of partitions for repartition topic.
	 * Note that Kafka Streams creates repartition topic only if required.
	 *
	 * @param numberOfPartitions the name used for the processor name and as part of the repartition topic name if required
	 * @return a new {@link Repartitioned} instance configured with the number of partitions
	 */
	public Repartitioned<K, V> withNumberOfPartitions(final int numberOfPartitions) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided key serde.
	 *
	 * @param keySerde Serde to use for serializing the key
	 * @return a new {@link Repartitioned} instance configured with the key serde
	 */
	public Repartitioned<K, V> withKeySerde(final Serde<K> keySerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided value serde.
	 *
	 * @param valueSerde Serde to use for serializing the value
	 * @return a new {@link Repartitioned} instance configured with the value serde
	 */
	public Repartitioned<K, V> withValueSerde(final Serde<V> valueSerde) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}

	/**
	 * Create a new instance of {@link Repartitioned} with the provided partitioner.
	 *
	 * @param partitioner the function used to determine how records are distributed among partitions of the topic,
	 *                    if not specified and the key serde provides a {@link WindowedSerializer} for the key
	 *                    {@link WindowedStreamPartitioner} will be used—otherwise {@link DefaultPartitioner} wil be used
	 * @return a new {@link Repartitioned} instance configured with provided partitioner
	 */
	public Repartitioned<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
		return new Repartitioned<>(name, keySerde, valueSerde, numberOfPartitions, partitioner);
	}
}

...

Code Block
languagejava
titleKStream.java
public interface KStream<K, V> {

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using default serializers, deserializers, producer's {@link DefaultPartitioner}.
	 * Number of partitions is inherited from the source topic.
	 *
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition(Repartitioned)
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition();

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains the exact same (and potentially repartitioned) records as this {@code KStream}
	 * @see #repartition()
	 * @see #repartition(KeyValueMapper, Repartitioned)
	 */
	KStream<K, V> repartition(final Repartitioned<K, V> repartitioned);

	/**
	 * Materialize this stream to a auto-generated repartition topic and creates a new {@code KStream}
	 * from the auto-generated topic using {@link Serde key serde}, {@link Serde value serde}, {@link StreamPartitioner},
	 * number of partitions and topic name part as defined by {@link Repartitioned}.
	 * The provided {@link KeyValueMapper} is applied to each input record and computes a new key for it.
	 * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K':V>}.
	 *
	 * @param mapper        a {@link KeyValueMapper} that computes a new key for each record
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} which determines how records are distributed among partitions of the topic,
	 *                      part of the topic name and number of partitions for a repartition topic, if repartitioning is required.
	 * @return a {@code KStream} that contains records with new key (possibly of different type) and unmodified value
	 * @see #repartition()
	 * @see #repartition(Repartitioned)
	 */
	<KR> KStream<KR, V> repartition(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper,
	                                final Repartitioned<KR, V> repartitioned);
}

...