Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under DiscussionAccepted"

Discussion thread: here

Vote thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13431

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Kafka Connect framework allows sink connector tasks to do their own offset tracking in case they want to do asynchronous processing (for instance, buffering records sent by the framework to be flushed to the sink system at some later time). The SinkTask::preCommit method allows sink task implementations to provide the framework with the consumer offsets that are safe to be committed commit for each topic partition. There's currently an incompatibility between Sink connectors overriding the SinkTask::preCommit method and SMTs that mutate the topic field of a SinkRecord (for instance, the RegexRouter SMT that ships with Apache Kafka).

The This problem was has been present since the SinkTask::preCommit method method's inception and is rooted in a mismatch between the Kafka topic / partition and offset that is passed to SinkTask::open / SinkTask::preCommit (the original topic partition and partition offset before applying any transformations are applied) and the topic/partition/offset that is present in the SinkRecord that the SinkTask::put method receives (after transformations are applied). Since that's all the information the connector has to implement any kind of internal offset tracking, the topic/partition/partitions offset it can return in preCommit will correspond to the transformed topic/partition/offset, when the framework actually expects it to be the original topic/partition/offset.

In

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5567
we fixed the problem was fixed for connectors that don't override  SinkTask::preCommit. For the others, we acknowledge it was acknowledged that "broader API changes are required". 

Public Interfaces

...

org.apache.kafka.connect.sink.

...

SinkRecord

Add new fields corresponding to a record's pre-transform ("original") topic / partition / offset along with their corresponding getters and constructor:

...

Code Block
languagejava
titleSinkTaskSinkRecord
    /** private final String originalTopic;
     * Put the records in the sinkprivate final Integer originalKafkaPartition;
    private final long originalKafkaOffset;
...
    public SinkRecord(String topic, int partition, *
Schema keySchema, Object key, Schema *valueSchema, IfObject thisvalue, operationlong failskafkaOffset,
 the  SinkTask may throw a {@link org.apache.kafka.connect.errors.RetriableException} to
     * indicate that the framework should attempt to retryLong thetimestamp, sameTimestampType calltimestampType, again.Iterable<Header> Otherheaders, exceptionsString willoriginalTopic, causeInteger theoriginalKafkaPartition, tasklong tooriginalKafkaOffset)
...     


    /**
  be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used* @return the topic corresponding to set the maximumKafka timerecord before the
any transformations were applied. This * batch willshould be retried.
     *
 used for any internal * In caseoffset tracking purposes rather than {@link #put#topic(Collection, Map)} has been overridden, in order to obtainbe originalcompatible topic
 partitions for records
  * with SMTs * (i.e. before SMTs are applied), this method can be implemented to simply call {@link #put(Collection, Map)} in this manner:
that mutate the topic name.
     */
    public String originalTopic() {
       *
 return originalTopic;
   * <pre>}

     /** {@code
     * put(records, Collections.emptyMap())
     * }
     * </pre>@return the topic partition corresponding to the Kafka record before any transformations were applied. This
     *
     * @param records the set of records to send should be used for any internal offset tracking purposes rather than {@link #kafkaPartition()}, in order to be 
     */
 compatible with SMTs publicthat abstractmutate voidthe put(Collection<SinkRecord> records);

topic partition.
     */**
    public *Integer Put the records in the sink. This method should be overridden by sink tasks that do their own offset tracking, inoriginalKafkaPartition() {
        return originalKafkaPartition;
    }

    /**
     * order to@return obtain the originaloffset topiccorresponding andto partitionthe for eachKafka record (before any potentially topic mutating SMTs aretransformations were applied). This
     * @paramshould be recordsused thefor setany ofinternal recordsoffset totracking send
purposes rather than {@link #kafkaOffset()}, *in @paramorder originalTopicPartitionsto Thebe original
 topic and partition for each* record,compatible beforewith SMTs have been appliedthat mutate the offset value.
     */
    public voidlong put(Collection<SinkRecord> records, Map<SinkRecord, TopicPartition> originalTopicPartitionsoriginalKafkaOffset() {
        return put(records)originalKafkaOffset;
    }


Proposed Changes

Add a new method in org.apache.kafka.connect.sink.SinkTask which allows the Connect framework to pass on the original topic partitions (i.e. before topic mutating SMTs have been applied) for records to sink task implementations in order to allow them to do their own offset tracking accurately. The WorkerSinkTask class will also be updated to maintain a new data structure Map<SinkRecord, TopicPartition> containing the original topic partition for each record Expose the original (pre-transformation) Kafka topic, topic partition and offset via new SinkRecord public methods and ask sink task implementations to use that information for offset tracking purposes (i.e. before any potentially topic mutating SMTs are applied) in a message batch. WorkerSinkTask will also be updated to call the new SinkTask::put method instead of the existing onethe topic partition offsets that the sink task returns to the framework via SinkTask::preCommit). Note that while the record's offset can't be modified via the standard SinkRecord::newRecord methods that SMTs are expected to use, SinkRecord has public constructors that would allow SMTs to return records with modified offsets. This is why the proposed changes include a new SinkRecord::originalKafkaOffset method as well.


Compatibility, Deprecation, and Migration Plan

Backwards Compatibility

Old / existing sink connector plugins running on a new Kafka Connect version

Old / This proposal is backward compatible such that existing sink connector implementations plugins will continue to work as before with newer Connect runtimes with this change since the new SinkTask::put method has a default implementation which calls the older SinkTask::put method. This proposal is also backward compatible such that updated connectors using the new functionality from this KIP will continue working with older Connect runtimes because the existing SinkTask::put method is abstract and has to be implemented (the Javadoc will be updated to reflect how this method should be implemented in case the newer SinkTask::put method is overridden - shown in the Public Interfaces section above). While these updated connectors will continue working with older Connect runtimes, they should document that topic mutating SMTs cannot be used with the connector if deployed on older Connect runtimesrunning as expected on new versions of Kafka Connect but they will continue to remain incompatible with topic-mutating SMTs.

New / updated sink connector plugins running on an older Kafka Connect version

To ensure that connectors using the new SinkRecord getter methods can still be deployed on older versions of Kafka Connect, a try catch block should be used to catch and handle NoSuchMethodErrorNoClassDefFoundError (also see KIP-610: Error Reporting in Sink Connectors for reference which had the same issue). However, it should be clearly documented that the connector won't be compatible with topic/partition/offset mutating SMTs when deployed on older versions of Kafka Connect.


Rejected Alternatives

  1. Address the offsets problem entirely within the framework, doing some kind of mapping from the transformed topic back to the original topic.

    • This would only work in the cases where there’s no overlap between the transformed topic names, but would break for the rest of the transformations (e.g. static transformation, topic = “a”).

    • Even if we wanted to limit the support to those cases, it would require considerable bookkeeping to add a validation to verify that the transformation chain adheres to that expectation (and fail fast if it doesn’t).

  2. Expose

    a new method in SinkRecord which returns

    the

    original topic and partition
    • This will require connector developers to use a try-catch around the new method calls to ensure that the connector is backward compatible with older Connect runtimes (where the new method won't be available). This is a pretty non-standard approach for most developers and we should try to keep this as a last resort.

    Expose the entire original record through SinkRecord instead of only topic / partition / offset (e.g. originalSinkRecord)

    • We should not expose the original value / key, transformations might be editing them for security reasons.

  3. Create a method in the SinkTaskContext to get this the original topic / partition / offset information, instead of updating SinkRecord (e.g. SinkTaskContext::.getOriginalTopic(SinkRecord sr) / SinkTaskContext::.getOriginalKafkaPartition(SinkRecord sr)

    • Requires extra bookkeeping without concrete value.

  4. Update SinkTask::put in any way to pass the new information outside SinkRecord (e.g. a Map or a derived class)

    • Much more disruptive change without considerable pros

  5. Use record headers to expose the original topic / partition / offset instead of adding new methods to SinkRecord
    • While this gets around the need to handle NoSuchMethodErrorNoClassDefFoundError in sink connectors that want to retain compatibility with older Connect runtimes, it is arguably even less intuitive from a connector developer's standpoint.
  6. Add new overloaded SinkTask::open and SinkTask::close methods which include both pre-transform and post-transform topic partitions so that connectors can create and destroy resources even if they choose to use post-transform topic partitions to write to the sink system
    • This requires sink connector plugins to implement two new additional methods without significant benefit. The information from the existing open / close methods on pre-transform topic partitions can be combined with the per record information of both pre-transform and post-transform topic partitions to handle most practical use cases without significantly muddying the sink connector related public interfaces.
  7. Keep the existing SinkTask::open and SinkTask::close methods but call them with post-transform rather than pre-transform topic partitions
    • This would break backward compatibility since there could exist connector plugins that are aware of the current limitations (i.e. that SinkTask::open / SinkTask::close are called for pre-transform topic partitions but records in SinkTask::put only currently contain their post-transform topic partition) and work around them. Also, pre-transform / original topic partitions and offsets are critical for any sink connectors that wish to implement exactly-once semantics by tracking offsets in the sink system rather than Kafka
    This has the same limitations w.r.t. requiring the use of ugly try-catch logic around methods which may or may not exist to ensure backward compatibility of connectors with older Connect runtimes
    • .