Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleprefixScan
import java.util.List;

public abstract class MultiCastStreamPartitioner<K, V> implements StreamPartitioner<K, V>{      public interface StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and value and the current number of partitions.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);


 	/**
     * Determine the List of partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to. Note that returning a single valued list with value -1 is a shorthand
     * for broadcasting the record to all the partitions of the topic.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return a List of integers between 0 and {@code numPartitions-1}, null would mean a broadcast to all partitions
	 * while an empty list would mean the record would not be sent to any partitions.
     * the partitions of the topic.
     */

    default List<Integer> partitions(String topic, K key, V value, int numPartitions) {
        return Collections.singletonList(partition(topic, key, value, numPartitions));
    }
}      

...

  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned list is singleton. 
  • Adding sufficient logging

Rejected Alternatives

The first 3 alternatives were rejected as they focussed only broadcasting to all partitions which seemed restrictive.

  • extend to() / addSink() with a "broadcast" option/config in KafkaStreams.
  • add toAllPartitions() / addBroadcastSink() methods in KafkaStreams.
  • allow StreamPartitioner to return `-1` for "all partitions".
  • Adding a separate class to handle multi casting. This was rejected in favour of enhancing the current StreamPartitioner  interface.