Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposal is to add new on change method to existing Partitioner Interface:

 

Code Block
languagejava
themeConfluence
package org.apache.kafka.clients.producer;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
/**
 * Partitioner Interface
 */
public interface Partitioner extends Configurable {
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    
   
    /**
     * When number of partition changes or Partition becomes online or offline or any meta data or information changes 
     * on about partition, this notification is triggered, it must be quickly act upon change or may choose to do nothing.
     * 
     * @param cluster  The new cluster metadata
     * @param partitionsChanged  Only partitions That have changed with its partition information. 
     * 
     */
    public void onPartitionsChange(Cluster cluster, Map<TopicPartition, PartitionInfo> partitionsChanged);
   
	 /**
     * This is called when partitioner is closed.
     */
    public void close();
}

 

 

...

Proposed Changes

Kafka Internal Change:

...

Leverage existing implementation for metadata refresh notification, the MetadaChangeListener class will be notified upon refresh and intern it will do diff with previous instance of Cluster and notify change via onPartitionsChange() method as shown below. 

Code Block
languagejava
themeConfluence
package org.apache.kafka.clients;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.Metadata.Listener;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class MetadatChangeListener implements Listener {
    
    private Partitioner partitioner;
    private Cluster prevClusterInfo;
    
    
    public MetadatChangeListener(Partitioner partitioner, Cluster 
            currentClusterInfo ){
            this.partitioner = partitioner;
            this.prevClusterInfo = currentClusterInfo;
    }
    @Override
    public void onMetadataUpdate(Cluster cluster) {
        Map<TopicPartition, PartitionInfo> partitionsChanged = new HashMap<TopicPartition, PartitionInfo>();
        /***
		 * TODO :  Implement this 

         * Determine the change from prevClusterInfo 
         * Do the diff with prevClusterInfo and add to partitionsChanged
         */
        
        partitioner.onPartitionsChange(cluster, partitionsChanged); 
    }
}

please note the above implementation is pseudocode implementation.
 

Compatibility, Deprecation, and Migration Plan

...

Rejected Alternatives

None