Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under Discussion"

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka 0.10 added the Timestamp as part of messages, which is particularly important for KStreams application. Currently that timestamp can be set by the broker, or by the producer. This addresses concerns with timestamps set at the producer level.
Currently, if the producer application does not join a timestamp to the ProducerRecord, such as in public ProducerRecord(String topic, K key, V value) , then the timestamp is inferred at send time being System.currentTimeMillis(). If a producer wants to explicitly set a timestamp for the record, it has got to use public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value). 

...

This KIP addresses all these concerns, by adding a friendlier more explicit ProducerRecordBuilder interface, which also enables Kafka to easily evolve the message format without adding constructors to the ProducerRecord interface. 

Public Interfaces

The ProducerRecord constructors will all be deprecated, except the main long explicit one:
Deprecated:

...

public ProducerRecord build()

Proposed Changes

A simple implementation would be:

...

Code Block
languagejava
package org.apache.kafka.clients.producer;

public class ProducerRecordBuilder<K, V> {

    private String topic;
    private Integer partition;
    private K key;
    private V value;
    private Long timestamp;

    public ProducerRecordBuilder(){};

    public ProducerRecordBuilder withKey(K key){
        this.key = key;
        return this;
    }

    public ProducerRecordBuilder withValue(V value){
        this.value = value;
        return this;
    }

    public ProducerRecordBuilder withTimestamp(Long timestamp){
        this.timestamp = timestamp;
        return this;
    }

    public ProducerRecordBuilder withTopic(String topic){
        this.topic = topic;
        return this;
    }

    public ProducerRecordBuilder withForcedPartition(Integer partition){
        this.partition = partition;
        return this;
    }

    public ProducerRecord build(){
        return new ProducerRecord<>(topic, partition, timestamp, key, value);
    }
}

 

Compatibility, Deprecation, and Migration Plan

  • Deprecation of ProducerRecord partial constructors
  • Migrating existing programs to the builder interface should be straightforward

Rejected Alternatives

I had tried to just add partial constructors here: https://github.com/apache/kafka/pull/2800/files

...