THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Changes to RecordMetaData
Code Block language java title RecordMetaData.java public final class RecordMetadata { /** * Partition value for record without partition assigned */ public static final int UNKNOWN_PARTITION = -1; private final long offset; .......... private final TopicPartition topicPartition; private final Header[] headers; private volatile Long checksum; public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp, Long checksum, int serializedKeySize, int serializedValueSize, Header[] headers) { // ignore the relativeOffset if the base offset is -1, // since this indicates the offset is unknown this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset; this.timestamp = timestamp; this.checksum = checksum; this.serializedKeySize = serializedKeySize; this.serializedValueSize = serializedValueSize; this.topicPartition = topicPartition; this.headers = headers; } ...... /** * The headers of the record was sent to */ public Header[] headers() { return this.headers; } ...... }
Changes to FutureRecordMetaData
Code Block language java title FutureRecordMetaData.java public final class FutureRecordMetadata implements Future<RecordMetadata> { private final ProduceRequestResult result; .......... private final Header[] headers; private volatile FutureRecordMetadata nextRecordMetadata = null; public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp, Long checksum, int serializedKeySize, int serializedValueSize, Time time, Header[] headers) { this.result = result; this.relativeOffset = relativeOffset; this.createTimestamp = createTimestamp; this.checksum = checksum; this.serializedKeySize = serializedKeySize; this.serializedValueSize = serializedValueSize; this.time = time; this.headers = headers; } ....... }
In ProduceBatch add headers to FutureRecordMetaData while appending record to the batch.
Code Block language java title ProducerBatch.java public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { ....... ......... FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM, headers); // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }
- Add headers to RecordMetaData for send error scenarios also.
...