Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This interface is particularly dangerous as it indicates to the user that they should provide an Integer partition. They would have to dive into the code to know that partition can be null, and then will be inferred using the key. I also believe offering partition as an argument goes against the current belief that "the same keys always goes to the same partition". 

This KIP addresses all these concerns, by adding a friendlier more explicit ProducerRecord interface, which also enables Kafka to easily evolve the message format without adding constructors to the ProducerRecord interfaceadds the missing constructor to add a Timestamp to a ProducerRecord or a SourceRecord

Public Interfaces

The ProducerRecord constructors will all be deprecated, except the main long explicit one:

DeprecatedNew interface:

public ProducerRecordSourceRecord(String topicMap<String, Integer?> partitionsourcePartition, K keyMap<String, V value)public ProducerRecord(?> sourceOffset,
 String topic, V value)public ProducerRecord(String topic, Integer partition Schema valueSchema, Object value, Long timestamp, K key, V value)

...

public ProducerRecord withTimestamp(Long timestamp)
public ProducerRecord withForcedPartition(Integer partition)

Proposed Changes

A simple implementation would be:

 

Code Block
languagejava
public class ProducerRecord<K, V> {
  /**
   * Add a partition to the record. Advanced usage only.
   * It is recommend to use a custom Partitioner class instead
   *
   * @param partition the partition to send that record to
   */
  public ProducerRecord<K, V> withForcedPartition(Integer partition) {
      if (partition != null && partition < 0)
          throw new IllegalArgumentException(
                  String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
      this.partition = partition;
      return this;
  }
  /**
   * Add a timestamp to the record.
   *
   * @param timestamp the timestamp of the record (unix time in milliseconds since epoch)
   */
  public ProducerRecord<K, V> withTimestamp(Long timestamp) {
      if (timestamp != null && timestamp < 0)
          throw new IllegalArgumentException(
                  String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
      this.timestamp = timestamp;
      return this;
  }
}

SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
 String topic, Schema keySchema, Object key, Schema valueSchema, Object value,
 Long timestamp)
public ProducerRecord(String topic, V value, Long timestamp)
public ProducerRecord(String topic, K key, V value, Long timestamp)

Proposed Changes

See KAFKA-5092 

Compatibility, Deprecation, and Migration Plan

  • Deprecation of ProducerRecord partial constructors
  • Migrating existing programs to the builder interface should be straightforward

Rejected Alternatives

I had tried to just add partial constructors here: https://github.com/apache/kafka/pull/2800/files

...

  • No deprecation
  • No migration needed for existing programs

Rejected Alternatives

  •  public ProducerRecord(String topic, Long timestamp, K key, V value)

Adding this constructor is quite dangerous because it's almost the same as the one that takes partition (the only difference is that one is a Long and the other is an Integer)
Therefore the ordering of arguments should explicitly prevent mistakes

 

 

  •  Adding a builder interface

Although more newbie friendly, it would imply deprecating the existing constructors, which could take years, because of maintaining code "backward compatibility". There are too many drawbacks and not enough advantages (see mailing list discussion).