Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
public interface KStream<K, V> {

	/**
	 * Group the records by their current key into a {@link KGroupedStream} while preserving the original values
	 * and using the serializers as defined by {@link GroupedRepartitioned}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * If a record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * If a key changing operator was used before this operation (e.g., {@link #selectKey(KeyValueMapper)},
	 * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
	 * {@link #transform(TransformerSupplier, String...)}), and no data redistribution happened afterwards (e.g., via
	 * {@link #through(String)}, {@link #repartition)}) an internal repartitioning topic may need to be created in Kafka if a later operator
	 * depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * <name> is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an internally
	 * generated name, and "-repartition" is a fixed suffix.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * For this case, all data of this stream will be redistributed through the repartitioning topic by writing all
	 * records to it, and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned
	 * correctly on its key.
	 *
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
	 *                      part of the name and number of partitions for a repartition topic if repartitioning is required.
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 * @see #groupBy(KeyValueMapper, Repartitioned)
	 */
	<KR> KGroupedStream<KR, V> groupByKey(final Repartitioned<KR, V> repartitioned);

	/**
	 * Group the records of this {@code KStream} on a new key that is selected using the provided {@link KeyValueMapper}
	 * and {@link Serde}s as specified by {@link Repartitioned}.
	 * Grouping a stream on the record key is required before an aggregation operator can be applied to the data
	 * (cf. {@link KGroupedStream}).
	 * The {@link KeyValueMapper} selects a new key (which may or may not be of the same type) while preserving the
	 * original values.
	 * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedStream}.
	 * <p>
	 * Because a new key is selected, an internal repartitioning topic may need to be created in Kafka if a later
	 * operator depends on the newly selected key.
	 * This topic will be named "${applicationId}-<name>-repartition", where "applicationId" is user-specified in
	 * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
	 * "<name>" is either provided via {@link org.apache.kafka.streams.kstream.Repartitioned#as(String)} or an
	 * internally generated name.
	 * If number of partitions is provided via {@link org.apache.kafka.streams.kstream.Repartitioned#withNumberOfPartitions(int)}
	 * repartition topic will be generated with the specified number of partitions.
	 * If not, number of partitions will be inherited from the source topic.
	 * <p>
	 * You can retrieve all generated internal topic names via {@link Topology#describe()}.
	 * <p>
	 * All data of this stream will be redistributed through the repartitioning topic by writing all records to it,
	 * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
	 * <p>
	 * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
	 *
	 * @param selector      a {@link KeyValueMapper} that computes a new key for grouping
	 * @param repartitioned the {@link Repartitioned} instance used to specify {@link org.apache.kafka.common.serialization.Serdes},
	 *                      {@link StreamPartitioner} used to determine how records are distributed among partitions of the topic,
	 *                      part of the name and number of partitions for a repartition topic if repartitioning is required.
	 * @param <KR>          the key type of the result {@link KGroupedStream}
	 * @return a {@link KGroupedStream} that contains the grouped records of the original {@code KStream}
	 */
	<KR> KGroupedStream<KR, V> groupBy(final KeyValueMapper<? super K, ? super V, KR> selector,
	                                   final Repartitioned<KR, V> repartitioned);
}

...