Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
import java.util.Set;
import java.util.Collections;

  public interface StreamPartitioner<K, V> {

    /**
     * Determine the partition number for a record with the given key and value and the current number of partitions.
     *
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
     */
    @Deprecated
    Integer partition(String topic, K key, V value, int numPartitions);     


 	/**
     * Determine the partition numbers to which a record, with the given key and value and the current number
     * of partitions, should be multi-casted to.
     * @param topic the topic name this record is sent to
     * @param key the key of the record
     * @param value the value of the record
     * @param numPartitions the total number of partitions
     * @return an aOptional of Set of integers between 0 and {@code numPartitions-1},
	     * Empty Setoptional means theuse recorddefault wouldpartitioner
 not be sent to any* partitionsOptional of the topic.
	 * Ifan empty set means the setrecord containswon't allbe partitionsent numbersto from 0-numPartitions-1, then 
	 * it's a case of broadcastingany partitions i.e dropped.
     * Optional of Set of integers means the partitions to which the record should be sent to.
  all partitions
    *  */
       
	default Set<Integer>Optional<Set<Integer>> partitions(String topic, K key, V value, int numPartitions) {
        Integer partition = partition(topic, key, value, numPartitions);
        return partition == null ? CollectionsOptional.emptySetempty() : Optional.of(Collections.singleton(partition(topic, key, value, numPartitions)));
    }  
 
}      


The return type is a Set so that for cases of a faulty implementation of partitions  method which might return same partition number multiple times, we would still record it only once.

...

  • RecordCollector and it's implementation RecordCollectorImpl has the method which accepts a StreamPartitioner object and pushes it to the partitions returned by the partition method. This is the core piece of logic which would allow multi/broadcasting records. Here are the high level changes. Note that when the partitions()  method return an empty set meaning we won't send the record to any partition, we would also be updating the droppedRecordsSensor. 


    Code Block
    languagejava
    @Override
        public <K, V> void send(final String topic,
                                final K key,
                                final V value,
                                final Headers headers,
                                final Long timestamp,
                                final Serializer<K> keySerializer,
                                final Serializer<V> valueSerializer,
                                final StreamPartitioner<? super K, ? super V> partitioner) {         
    		Set<Integer> multicastPartitions; 
    
            if (partitioner != null) {
    
                // Existing logic to fetch partitions
    
    			// Changes to be made due to the KIP
                       
    			if (partitions.size() > 0) {
                    final multicastPartitionsOptional<Set<Integer>> maybeMulticastPartitions = partitioner.partitions(topic, key, value, partitions.size());
                    if (multicastPartitions!maybeMulticastPartitions.isEmptyisPresent()) {
    					// New change. If a record is not to be sent to any partitions, mark it being
    					// as a dropped record.                  // New change. Use default partitioner
                        log.info("Not sending the record to any partition"send(topic, key, value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                    }    droppedRecordsSensor.record();else {
                    } else {
    					// iterate overSet<Integer> themulticastPartitions all the partitions and send to the valid ones.
    = maybeMulticastPartitions.get();
                               for (int p: multicastPartitionsif (multicastPartitions.isEmpty()) {
    						// Check if valid partition number
    						if (p >= 0 && p < partitions.size()) {
               // If a record is not to be sent to any partitions, mark it as a dropped record.
       	send(topic, key, value, headers, p, timestamp, keySerializer, valueSerializer);
    						} else {
    							log.debug             log.info("Not sending the record to invalidany partition number: {} for topic {}, p, topic);
    						}
            ");
                            droppedRecordsSensor.record();
                        } else {
                            for (final int multicastPartition: multicastPartitions) {
                                send(topic, key, value, headers, multicastPartition, timestamp, keySerializer, valueSerializer, processorNodeId, context);
                            }
                        }
                    }
                } else {
                    throw new StreamsException("Could not get partition information for topic " + topic + " for task " + taskId +
                            ". This can happen if the topic does not exist.");
                }
      		} else {
                send(topic, key, }
    value, headers, null, timestamp, keySerializer, valueSerializer, processorNodeId, context);
         } 
    
    			// Existing logic.
    }       
    	}


  • StreamPartitioner is also used in FK-join and Interactive queries. The invocation of partition()  would be replaced by partitions()  and a runtime check would be added to ensure that the returned set is singleton. 

...