co-authored-by: Mickael Maison <mickael.maison@gmail.com>

co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>

Status

Current state"Discarded"

Discussion thread: here or here

JIRA: KAFKA-7666 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Replicating topic data across multiple Kafka clusters is a very common scenario and there are many tools that provide this functionality. However simply copying topic records (key/value/headers/timestamp) may not be enough in many use cases.


Offsets are currently automatically assigned by brokers upon receiving messages. So when replicating data into another cluster, messages in the destination cluster are likely to have a different offsets than the originals in the source cluster. This makes replicating the __consumer_offsets topic ineffective and consumers can't rely on their group metadata stored in the source cluster when switching to another cluster.

We propose a mechanism to replicate records able to maintain the same offset in both clusters.  
Such a mechanism could be easily used in Kafka Connect, although the Connect framework will need to be updated slightly. That enhancement will be the subject of a follow-up KIP.

Proposed Changes

We propose allowing producers to send each record data (key/value/headers/timestamp) with an offset. That offset can be accepted by the broker as the offset in the topic-partition log.

Public Interfaces

Network protocol 

new Error Code 

INVALID_PRODUCE_OFFSET (77)

new version for ProduceRequest (v8)

Produce Request (Version: 8) => transactional_id acks timeout [topic_data] 
  transactional_id => NULLABLE_STRING
  acks => INT16
  timeout => INT32
  topic_data => topic [data] 
    topic => STRING
    data => partition record_set 
      partition => INT32
      record_set => RECORDS
  use_offset => BOOLEAN    <--- NEW

new version for ProduceResponse (v8)

Produce Response (Version: 8) => [responses] throttle_time_ms 
  responses => topic [partition_responses] 
    topic => STRING
    partition_responses => partition error_code base_offset log_append_time log_start_offset 
      partition => INT32
      error_code => INT16
      base_offset => INT64
      log_append_time => INT64
      log_start_offset => INT64
      log_end_offset => INT64     <--- NEW
  throttle_time_ms => INT32

Changes to the Java API

New class InvalidProduceOffsetException


package org.apache.kafka.common.errors;
/**
 * Thrown when the offset specified in a Produce request is smaller than the current Log End Offset
 * @see org.apache.kafka.clients.producer.ProducerRecordWithOffset
 */
public class InvalidProduceOffsetException extends InvalidOffsetException {
      public InvalidProduceOffsetException(String message);     
      public InvalidProduceOffsetException(String message, long logEndOffset);
      public long getLogEndOffset();
}

This exception is mapped to the new protocol Error 77

Change to ProducerRecord



package org.apache.kafka.clients.producer;
public class ProducerRecord {
    // ...
	// new method
	public OptionalLong offset() {
	   return OptionalLong.empty();
	}
}

New class ProducerRecordWithOffset

package org.apache.kafka.clients.producer;
public class ProducerRecordWithOffset<K, V> extends ProducerRecord<K, V> {
	//constructor 
	public ProducerRecordWithOffset(String topic, Integer partition, Long timestamp, 
                                    K key, V value, Iterable<Header> headers, 
                                    long offset);
	@Override
	public OptionalLong offset() {
   		return OptionalLong.of(this.offset);
	}
}

new Acl Operation

package org.apache.kafka.common.acl;
public enum AclOperation {
    /**
     * REPLICATOR_WRITE operation (Produce with offsets).
     */
    REPLICATOR_WRITE((byte) 13);
}

Command line tools and arguments


Compatibility, Deprecation, and Migration Plan

Rejected Alternatives