Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: http://mail-archives.apache.org/mod_mbox/kafka-dev/201509.mbox/%3CCAG9AH6cp57nsCfahndtbBCPP_Nk3AW8TmHV6drSwSd8Eswv0Ag@mail.gmail.comhere%3E

JIRA: TBD (If Kafka Dev team decide to incorporate this feature)

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Based on KIP-22, New (Java) Kafka Producer has capability to plug-in custom logic to assign partition to given message per topic.  However, the implementation of custom partitioning strategies depends on critical information such as # of partition change, online vs offline partitions.   This information is available to producer internally via each metadata refresh interval (metadata.max.age.ms) or when certain type of error occurs within Network Client Code (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java )  will attempt metadata refresh.

...

  • Capability to Monitor Event From Producer
  • Ability to implement feedback loop with Change Controller (e.g Kafka NOC/Dev Ops team) by which producer can acknowledge via change (e.g partition increase, decrease or auto change partition online/offline can be integrated) and change can be validated.  Hence, provide end-end visibility to Kafka Dev Ops Team.
  • Also consumer side can also benefits from this event notification to report or acknowledge that change has been propagated and accepted by the Consumer Group that is consuming topic.

Public Interfaces

Proposal is to add new on change method to existing Partitioner Interface:

Code Block
languagejava
themeConfluence
package org.apache.kafka.clients.producer;
import java.util.Map;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
/**
 * Partitioner Interface
 */
public interface Partitioner extends Configurable {
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    
   
    /**
     * When number of partition changes or Partition becomes online or offline or any meta data or information changes 
     * on about partition, this notification is triggered, it must be quickly act upon change or may choose to do nothing.
     * 
     * @param cluster  The new cluster metadata
     * @param partitionsChanged  Only partitions That have changed with its partition information. 
     * 
     */
    public void onPartitionsChange(Cluster cluster, Map<TopicPartition, PartitionInfo> partitionsChanged);
   
	 /**
     * This is called when partitioner is closed.
     */
    public void close();
}

Proposed Changes

Kafka Internal Change:

Kafka already has internal way of getting notified when Metadata Request Update via following internal listener. 

...

please note the above implementation is pseudocode implementation.

Compatibility, Deprecation, and Migration Plan

None

Rejected Alternatives

None1) Each of the  Partitioner class also implement the Listener and each metadata refresh will call listener.  This will not scale when you each one of the Patitioner will need to do diff between prev and old.