Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Changes to RecordMetaData

    Code Block
    languagejava
    titleRecordMetaData.java
    public final class RecordMetadata {
    
        /**
         * Partition value for record without partition assigned
         */
        public static final int UNKNOWN_PARTITION = -1;
    
        private final long offset;
        ..........
        private final TopicPartition topicPartition;
        private final Header[] headers;
    
        private volatile Long checksum;
    
        public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
                              Long checksum, int serializedKeySize, int serializedValueSize, Header[] headers) {
            // ignore the relativeOffset if the base offset is -1,
            // since this indicates the offset is unknown
            this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset;
            this.timestamp = timestamp;
            this.checksum = checksum;
            this.serializedKeySize = serializedKeySize;
            this.serializedValueSize = serializedValueSize;
            this.topicPartition = topicPartition;
            this.headers = headers;
        }
        ......
    
        /**
         * The headers of the record was sent to
         */
    
        public Header[] headers() {
            return this.headers;
        }
    
    
       ......
    }


  2. Changes to FutureRecordMetaData

    Code Block
    languagejava
    titleFutureRecordMetaData.java
    public final class FutureRecordMetadata implements Future<RecordMetadata> {
    
        private final ProduceRequestResult result;
        ..........
        private final Header[] headers;
        private volatile FutureRecordMetadata nextRecordMetadata = null;
    
        public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
                                    Long checksum, int serializedKeySize, int serializedValueSize, Time time, Header[] headers) {
            this.result = result;
            this.relativeOffset = relativeOffset;
            this.createTimestamp = createTimestamp;
            this.checksum = checksum;
            this.serializedKeySize = serializedKeySize;
            this.serializedValueSize = serializedValueSize;
            this.time = time;
            this.headers = headers;
        }
    
    .......
    
    }


  3. In ProduceBatch add headers to FutureRecordMetaData while appending record to the batch.

  4. Add headers to RecordMetaData for send error scenarios also.

...


Compatibility, Deprecation, and Migration Plan


  • What impact (if any) will there be on existing users?

...