Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Connect framework allows sink connector tasks to do their own offset tracking in case they want to do asynchronous processing (for instance, buffering records sent by the framework to be flushed to the sink system at some later time). The SinkTask::preCommit method allows sink task implementations to provide the framework with the consumer offsets to be committed for each topic partition. There's currently an incompatibility between Sink connectors overriding the SinkTask.::preCommit method (for asynchronous processing) and SMTs that mutate the topic field.

The problem was present since the SinkTask::preCommit method's inception and is rooted in a mismatch between the topic/partition that is passed to SinkTask::open / SinkTask::preCommit (the original topic and partition before applying any transformations) and the topic partition that is present in the SinkRecord that the SinkTask.::put method receives (after transformations are applied). Since that's all the information the connector has to implement any kind of internal offset tracking, the topic/partitions it can return in preCommit will correspond to the transformed topic, when the framework actually expects it to be the original topic.

In

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5567
we fixed the problem for connectors that don't override  SinkTask.::preCommit. For the others, we acknowledge acknowledged that "broader API changes are required". 

...

org.apache.kafka.connect.sink.SinkRecord

Add two a new fields , along with their its corresponding getters getter and constructor:

Code Block
languagejava
titleSinkRecord
    private final String originalTopic;
    private final Integer originalKafkaPartition;
...
    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                      Long timestamp, TimestampType timestampType, Iterable<Header> headers, String originalTopic, int originalKafkaPartition) {
...
      

	/**
     * @return topic corresponding to the Kafka record before transformations were applied. This should isbe
     * necessaryused for any internal offset tracking, torather bethan compatible with SMTs that mutate the topic name.
     */
    public String originalTopic() {
        return originalTopic;
    }

    /**
     * @return topic partition corresponding to the Kafka record before transformations were applied. This is{@link #topic()}, in order to be compatible 
     * necessary for internal offset tracking, to be compatible with SMTs that mutate the topic name.
     */
     
	public String originalKafkaPartitionoriginalTopic() {
        return originalKafkaPartitionoriginalTopic;
    }


Proposed Changes

Expose the original Kafka topic /partition in SinkRecord and ask Sink Connectors to use that information for offset tracking purposes.

...

  1. Address the problem entirely within the framework, doing some kind of mapping from the transformed topic back to the original topic.

    1. This would only work in the cases where there’s no overlap between the transformed topic names, but would break for the rest of the transformations (e.g. static transformation, topic = “a”).

    2. Even if we wanted to limit the support to those cases, it would require considerable bookkeeping to add a validation to verify that the transformation chain adheres to that expectation (and fail fast if it doesn’t).

  2. Expose the entire original record instead of only topic/partition (e.g. originalSinkRecord)

    • We should not expose the original value/key, transformations might be editing them for security reasons.

  3. Create a method in the SinkTaskContext to get this information, instead of updating SinkRecord (e.g. SinkTaskContext.getOriginalTopic(SinkRecord sr) / SinkTaskContext.getOriginalKafkaPartition(SinkRecord sr)

    • Requires extra bookkeeping without concrete value.

  4. Update SinkTask.::put in any way to pass the new information outside SinkRecord (e.g. a Map or a derived class)

    • Much more disruptive change without considerable pros