THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
This KIP addresses all these concerns, by adding a friendlier more explicit ProducerRecordBuilder ProducerRecord interface, which also enables Kafka to easily evolve the message format without adding constructors to the ProducerRecord interface.
...
public ProducerRecord(String topic, K key, V value)
public ProducerRecord(String topic, V value)
...
)
...
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)
New interface:
public ProducerRecordBuilder()
public ProducerRecordBuilder withKey(K key)
public ProducerRecordBuilder withValue(V value)
public ProducerRecordBuilderProducerRecord withTimestamp(Long timestamp)
public ProducerRecordBuilder withTopic(String topic)public ProducerRecordBuilder ProducerRecord withForcedPartition(Integer partition)public ProducerRecord build()
Proposed Changes
A simple implementation would be:
Code Block | ||
---|---|---|
| ||
package org.apache.kafka.clients.producer; public class ProducerRecordBuilder<KProducerRecord<K, V> { /** * Add a partition to the record. privateAdvanced Stringusage topic;only. private Integer partition; private K key;* It is recommend to use a custom Partitioner class instead * * private@param Vpartition value; the partition to send privatethat Longrecord timestamp; to */ public ProducerRecord<K, publicV> ProducerRecordBuilderwithForcedPartition(Integer partition) {}; public ProducerRecordBuilderif withKey(K key){(partition != null && partition < 0) this.key = key; throw new IllegalArgumentException( return this; } public ProducerRecordBuilder withValue(V value){ String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition)); this.valuepartition = valuepartition; return this; } } public ProducerRecordBuilder withTimestamp(Long timestamp){/** * Add a timestamp to the record. * * @param this.timestamp =the timestamp; of the record (unix time in milliseconds returnsince this;epoch) }*/ public ProducerRecord<K, publicV> ProducerRecordBuilder withTopicwithTimestamp(StringLong topictimestamp) { if this.topic (timestamp != topic; null && timestamp < 0) return this; } publicthrow ProducerRecordBuildernew withForcedPartitionIllegalArgumentException(Integer partition){ this.partition = partition; return this; } public ProducerRecord build(){String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp)); this.timestamp = returntimestamp; new ProducerRecord<>(topic, partition, timestamp, key,return value)this; } } |
Compatibility, Deprecation, and Migration Plan
...