Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP addresses all these concerns, by adding a friendlier more explicit ProducerRecordBuilder ProducerRecord interface, which also enables Kafka to easily evolve the message format without adding constructors to the ProducerRecord interface. 

...

public ProducerRecord(String topic, K key,  V value)
public ProducerRecord(String topic, V value)

...

)

...

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)

New interface:

public ProducerRecordBuilder()
public ProducerRecordBuilder withKey(K key)
public ProducerRecordBuilder withValue(V value)
public ProducerRecordBuilderProducerRecord withTimestamp(Long timestamp)
public ProducerRecordBuilder withTopic(String topic)public ProducerRecordBuilder ProducerRecord withForcedPartition(Integer partition)public ProducerRecord build()

Proposed Changes

A simple implementation would be:

 

Code Block
languagejava
package org.apache.kafka.clients.producer;

public class ProducerRecordBuilder<KProducerRecord<K, V> {
  /**
   * Add a partition to the record. privateAdvanced Stringusage topic;only.
    private Integer partition;
    private K key;* It is recommend to use a custom Partitioner class instead
   *
   * private@param Vpartition value;
the partition to send privatethat Longrecord timestamp;
to
   */
  public ProducerRecord<K, publicV> ProducerRecordBuilderwithForcedPartition(Integer partition) {};

     public ProducerRecordBuilderif withKey(K key){(partition != null && partition < 0)
        this.key = key;
  throw new IllegalArgumentException(
             return this;
    }

    public ProducerRecordBuilder withValue(V value){
  String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
      this.valuepartition = valuepartition;
        return this;
  }
  }

    public ProducerRecordBuilder withTimestamp(Long timestamp){/**
   * Add a timestamp to the record.
   *
   * @param this.timestamp =the timestamp;
 of the record (unix time in milliseconds returnsince this;epoch)
    }*/

  public ProducerRecord<K, publicV> ProducerRecordBuilder withTopicwithTimestamp(StringLong topictimestamp) {
      if  this.topic (timestamp != topic;
null && timestamp <  0)
   return this;
    }

    publicthrow ProducerRecordBuildernew withForcedPartitionIllegalArgumentException(Integer partition){
        this.partition = partition;
        return this;
    }

    public ProducerRecord build(){String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
      this.timestamp = returntimestamp;
  new ProducerRecord<>(topic, partition, timestamp, key,return value)this;
    }
}

 

Compatibility, Deprecation, and Migration Plan

...