Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The StreamPartitioner method would have a new method added called partitions()  and the current partition()  method would be marked as deprecated. The partitions()  method would be marked as  default within which it would invoke partition()  method and construct a singleton list set out of it. Here's how the interface would look like now:

Code Block
languagejava
import java.util.ListSet;
import java.util.Collections;

public  public interface StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and value and the current number of partitions.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);


 	/**
     * Determine the List of partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to.
   Note that returning* a@param singletopic valuedthe listtopic withname valuethis -1record is asent shorthandto
     * for@param broadcastingkey the recordkey toof all the partitions of the topic.record
     *
     * @param topicvalue the topicvalue nameof thisthe record is sent to
     * @param keynumPartitions the total keynumber of the recordpartitions
     * @param@return valuea theSet value of theintegers record
between 0 and   * @param numPartitions the total number of partitions
     * @return a List of integers between 0 and {@code numPartitions-1}, null would mean a broadcast to all partitions
	 * while an empty list would mean the record would not be sent to any partitions.
     * the partitions of the topic.{@code numPartitions-1},
	 * Empty Set means the record would not be sent to any partitions of the topic.
	 * If the set contains all partition numbers from 0-numPartitions-1, then 
	 * it's a case of broadcasting the record to all partitions
      */      
	default Set<Integer> partitions(String topic, K key, V value, int numPartitions) {
     */

   Integer defaultpartition List<Integer>= partitionspartition(String topic, Kkey, keyvalue, numPartitions);
 V  value, int numPartitions) {
  return partition == null ? Collections.emptySet() return: Collections.singletonListsingleton(partition(topic, key, value, numPartitions));
    }
 }      


The return type is a Set so that for cases of a faulty implementation of partitions  method which might return same partition number multiple times, we would still record it only once.

Proposed Changes

  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty list set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 


    Code Block
    languagejava
    @Override@Override
        public <K, V> void send(final String topic,
                                final K key,
        public <K, V>                      final V valuevoid send(final String topic,
                                final HeadersK headerskey,
                                final LongV timestampvalue,
                                final Serializer<K>Headers keySerializerheaders,
                                final Serializer<V>Long valueSerializertimestamp,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    		List<Integer> multicastPartitions; 
    
    Serializer<K> keySerializer,
                         if (partitioner != null) {
    
    
       final Serializer<V> valueSerializer,
           // Existing logic to fetch partitions
    
    
    			// Changes to be made due to the KIP
            final StreamPartitioner<? super K, if (partitions.size() > 0? super V> partitioner) {
             
    		Set<Integer> multicastPartitions; 
    
            if multicastPartitions(partitioner != partitioner.partitions(topic, key, value, partitions.size());null) {
    
                // Existing logic to if (multicastPartitions.isEmpty()) {
    		fetch partitions
    
    			// NewChanges change.to Ifbe amade recorddue isto notthe toKIP
     be sent to any partitions, mark it being
    					// as a dropped record.
           if (partitions.size() > 0) {
                    multicastPartitions = logpartitioner.infopartitions("Not sending the record to any partition"topic, key, value, partitions.size());
                        droppedRecordsSensor.record();
                    } else {
    if (multicastPartitions.isEmpty()) {
    					// New change. If a record is not to be sent to any partitions, mark it being
    					// as broadcasta dropped record.
                        if (multicastPartitions == null) {
          log.info("Not sending the record to any partition");
                      // Broadcast to all partitions
     droppedRecordsSensor.record();
                    } else {
    					// iterate over the all the partitions and multicastPartitions = partitions.stream().map(PartitionInfo::partition).collect(Collectors.toList());
    send to the valid ones.
                        for (int p: multicastPartitions) }{
    						// iterateCheck overif thevalid subset of partitions and send to those.
          partition number
    						if (p >= 0 && p < partitions.size() - 1) {
                  for (int p: multicastPartitions) {
          	send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer);
    						} else {
    							log.debug("Not sending record to invalid partition number: {} for send(topic, key, value, headers {}, p, timestamp, keySerializer, valueSerializertopic);
    						}
                        }
                    }
                } 
    
    			// Existing logic.
          }


  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned list set is singleton. 


Example Usage

...

Code Block
languagejava
// Define a partitioner class which sends to even numbered partitions

public static class EvenPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        public Integer partition(String topic, K key, V value, int numPartitions) {
            return null;
        }

        @Override
        public List<Integer>Set<Integer> partitions(String topic, K key, V value, int numPartitions) {
            final List<Integer>Set<Integer> partitions = new ArrayList<>HashSet<>();
            for (int i = 0; i < numPartitions; i += 2) {
                partitions.add(i);
            }
            return partitions;
        }
    }

// Broadcasting 
public static class BroadcastingPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        public Integer partition(String topic, K key, V value, int numPartitions) {
            return null;
        }

        @Override
        public List<Integer>Set<Integer> partitions(String topic, K key, V value, int numPartitions) {
                     
				return null IntStream.range(0, numPartitions).boxed().collect(Collectors.toSet());
            }
    }

// Don't send to any partitions
public static class DroppingPartitioner<K, V> implements StreamPartitioner<K, V> {

        @Override
        public Integer partition(String topic, K key, V value, int numPartitions) {
            return null;
        }

        @Override
        public List<Integer>Set<Integer> partitions(String topic, K key, V value, int numPartitions) {
            return Collections.emptyListemptySet();
        }
    }


// Build Stream.
final KStream<String, String> inputStream = builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));

// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new EvenPartitioner()));
// BroadcaseBroadcast
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new BroadcastingPartitioner()));
// Send to even numbered partitions
inputStream.to("output-topic", Produced.with(Serdes.String(), Serdes.String(), (topic, key, val, numPartitions) -> new DroppingPartitioner()));
}

...

  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned list set is singleton. 
  • Adding sufficient logging

...