Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:"Under Discussion" Accepted

Discussion thread: here

JIRA: KAFKA-2091

Released: 0.8.3

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
package org.apache.kafka.clients.producer;
public interface Partitioner extends Configurable {

/**
 * Configure partitioner class with given key,value pairs from partitioner.metadata config.
 */
 @override
 public void configure(Map<String, ?> configs);

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes The serialized key to partition on( or null if no key)
 * @param value The value to partition on
 * @param valueBytes The serialized value to partition on
 * @param partition The partition to use (or null if none)
 * @param cluster The current cluster metadata
 */
 public int partition(String topic, Object key, byte[] key, Object value, byte[] value, Integer partition, Cluster cluster Cluster cluster);

 /**
  * This is called when partitioner is closed.
  */
  public void close();
  

 }
Code Block
languagejava
package org.apache.kafka.clients.producer;
public class DefaultPartitioner implements Partitioner {
}

...

  1. Add a new partitioner interface .
  2. Move existing partitioner code to DefaultPartitioner .
  3. If Users provide partition id than it will take precedence over partitioner.class .If partition id is null than only partitioner.class partition() method called to  compute partition for given Record.
  4. Introduce new producer config called "partitioner.class" . By default it "partitioner.class" points to org.apache.kafka.clients.producer.DefaultPartitioner

...