You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

 

Status

Current state:  Discussion

Discussion threadhere

JIRAKAFKA-5142 - KIP-145 - Expose Record Headers in Kafka Connect

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As KIP-82 introduced Headers into the core Kafka Product, it would be advantageous to expose them in the Kafka Connect Framework.

Connectors that replicate data between Kafka cluster or between other messaging products and Kafka would want to replicate the headers.

 

Public Interfaces

This KIP has the following public interface changes:

  1. Add new constructor to Source Record

    public class SourceRecord {
         
    +	public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
                        String topic, Integer partition,
                        Schema keySchema, Object key,
                        Schema valueSchema, Object value,
                        Long timestamp, Iterable<Header> headers)
       
    }
     
  2. Add new constructor to Sink Record

    public class SinkRecord {
    +   public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                      Long timestamp, TimestampType timestampType, Iterable<Header> headers)
       
    }
     
  3. modify parent ConnectRecord constructor , and expose headers accessor method.

    public class ConnectRecord {
     
    +   public ConnectRecord(String topic, Integer kafkaPartition,
                         Schema keySchema, Object key,
                         Schema valueSchema, Object value,
                         Long timestamp, Iterable<Header> headers) 
    +   public Headers headers()
    
      
    }
     
  4. Add newRecord abstract method to ConnectRecord and override implementations in SourceRecord and SinkRecord to take headers parameter, with existing signature delegating to new signature, but left in for compatibility.

Proposed Changes

Add a headers field Headers to shared ConnectRecord, in turn would be available in Source/SinkRecords

  • Accessor methods of Headers headers() added ConnectRecord
  • Add constructor(s) of Connect.Source/SinkRecord to allow passing in of an existing/pre-constructed headers via Iterable<Header> 
    1. use case is Replication connectors able to copy headers.
  • See above.

WorkerSinkTask

  • Update convertMessages method, to take from the ConsumerRecord the headers and pass them into the created SinkRecord

 

WorkerSourceTask

  • Update sendRecords method, to take from the SourceRecord the headers and pass them into the created ProducerRecord

 

Compatibility, Deprecation, and Migration Plan

  • Current client users should not be affected, this is new api methods being added to connect source/sink apis
  • This builds on KIP-82 - Add Record Headers

Out of Scope

Rejected Alternatives

  • No labels