Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Changes to RecordMetaData

    Code Block
    languagejava
    titleRecordMetaData.java
    public final class RecordMetadata {
    
        /**
         * Partition value for record without partition assigned
         */
        public static final int UNKNOWN_PARTITION = -1;
    
        private final long offset;
        ..........
        private final TopicPartition topicPartition;
        private final Header[] headers;
    
        private volatile Long checksum;
    
        public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp,
                              Long checksum, int serializedKeySize, int serializedValueSize, Header[] headers) {
            // ignore the relativeOffset if the base offset is -1,
            // since this indicates the offset is unknown
            this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset;
            this.timestamp = timestamp;
            this.checksum = checksum;
            this.serializedKeySize = serializedKeySize;
            this.serializedValueSize = serializedValueSize;
            this.topicPartition = topicPartition;
            this.headers = headers;
        }
        ......
    
        /**
         * The headers of the record was sent to
         */
    
        public Header[] headers() {
            return this.headers;
        }
    
    
       ......
    }


  2. Changes to FutureRecordMetaData

    Code Block
    languagejava
    titleFutureRecordMetaData.java
    public final class FutureRecordMetadata implements Future<RecordMetadata> {
    
        private final ProduceRequestResult result;
        ..........
        private final Header[] headers;
        private volatile FutureRecordMetadata nextRecordMetadata = null;
    
        public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp,
                                    Long checksum, int serializedKeySize, int serializedValueSize, Time time, Header[] headers) {
            this.result = result;
            this.relativeOffset = relativeOffset;
            this.createTimestamp = createTimestamp;
            this.checksum = checksum;
            this.serializedKeySize = serializedKeySize;
            this.serializedValueSize = serializedValueSize;
            this.time = time;
            this.headers = headers;
        }
    
    .......
    
    }


  3. In ProduceBatch add headers to FutureRecordMetaData while appending record to the batch.

    Code Block
    languagejava
    titleProducerBatch.java
     public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
             .......
            .........
                FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                                       timestamp, checksum,
                                                                       key == null ? -1 : key.length,
                                                                       value == null ? -1 : value.length,
                                                                       Time.SYSTEM, headers);
                // we have to keep every future returned to the users in case the batch needs to be
                // split to several new batches and resent.
                thunks.add(new Thunk(callback, future));
                this.recordCount++;
                return future;
            }
        }
    
    

             

  4. Add headers to RecordMetaData for send error scenarios also.

...