Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Connect framework allows sink connector tasks to do their own offset tracking in case they want to do asynchronous processing (for instance, buffering records sent by the framework to be flushed to the sink system at some later time). The SinkTask::preCommit method allows sink task implementations to provide the framework with the consumer offsets to be committed for each topic partition. There's currently an incompatibility between Sink connectors overriding the SinkTask.::preCommit method (for asynchronous processing) and SMTs that mutate the topic field.

The problem was present since the preCommit method SinkTask::preCommit method's inception and is rooted in a mismatch between the topic/partition that is passed to open/preCommit (the original topic and partition before applying any transformations) and the topic partition that is present in the SinkRecord that the SinkTask.::put method receives (after transformations are applied). Since that's all the information the connector has to implement any kind of internal offset tracking, the topic/partitions it can return in preCommit will correspond to the transformed topic, when the framework actually expects it to be the original topic.

In

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5567
we fixed the problem for connectors that don't override  SinkTask.::preCommit. For the others, we acknowledge that "broader API changes are required". 

Public Interfaces

This KIP proposes to introduce a new overloaded public void put(Collection<SinkRecord> records, Map<SinkRecord, TopicPartition> originalTopicPartitions method in addition to the existing public abstract void put(Collection<SinkRecord> records) method in the org.apache.kafka.connect.sink.

...

SinkTask abstract class. The new method has an additional parameter which provides a mapping from SinkRecord objects to their corresponding ConsumerRecord's original topic partition (i.e. before it has been potentially mutated by an SMT). SinkTask implementations can use this mapping to obtain the original topic partition for each record in order to do their own offset tracking and will be able to provide accurate offsets back to the framework through the SinkTask::preCommit method

org.apache.kafka.connect.sink.SinkTask


Code Block
languagejava
titleSinkRecordSinkTask
    /**
 private final String originalTopic;
 * Put the privaterecords finalin Integerthe originalKafkaPartition;
sink...
    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, *
     * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.connect.errors.RetriableException} to
     * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
 Long timestamp, TimestampType timestampType, Iterable<Header>* headers,be Stringstopped originalTopic, int originalKafkaPartition) {
...
immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the
     * batch will be retried.
     /**
     * @return topic corresponding to the Kafka record before transformations were applied. This is In case {@link #put(Collection, Map)} has been overridden in order to obtain original topic partitions for records
     * (i.e. before SMTs are applied), this method can be implemented to simply call {@link #put(Collection, Map)} in this manner:
     * necessary for internal offset tracking, to be compatible with SMTs that mutate the topic name.
     * <pre>
     * {@code
     * put(records, Collections.emptyMap())
     * }
     * </pre>
     *
      public String originalTopic() {* @param records the set of records to send
     */
   return originalTopic;
public abstract void  }put(Collection<SinkRecord> records);

    /**
     * @return topic partition corresponding to the Kafka record before transformations were applied. This is Put the records in the sink. This method should be overridden by sink tasks that do their own offset tracking, in
     * order to obtain the original topic and partition for each record (before any potentially topic mutating SMTs are applied).
     * @param records the set of records to send
     * necessary@param fororiginalTopicPartitions internalThe offsetoriginal tracking,topic toand bepartition compatiblefor witheach SMTsrecord, thatbefore mutateSMTs thehave topicbeen nameapplied.
     */
    public Stringvoid originalKafkaPartition(put(Collection<SinkRecord> records, Map<SinkRecord, TopicPartition> originalTopicPartitions) {
        return originalKafkaPartitionput(records);
    }


Proposed Changes

Expose the original topic/partition in SinkRecord and ask Sink Connectors to use that information for offset tracking purposesAdd a new method in org.apache.kafka.connect.sink.SinkTask which allows the Connect framework to pass on the original topic partitions (i.e. before topic mutating SMTs have been applied) for records to sink task implementations in order to allow them to do their own offset tracking accurately. The WorkerSinkTask class will also be updated to maintain a new data structure Map<SinkRecord, TopicPartition> containing the original topic partition for each record (i.e. before any potentially topic mutating SMTs are applied) in a message batch. WorkerSinkTask will also be updated to call the new SinkTask::put method instead of the existing one.

Compatibility, Deprecation, and Migration Plan

...

This proposal is backward compatible such that existing sink connector implementations connector implementations will continue to work as before .

Forward Compatibility

To ensure that new connectors using this new method and interface can still be deployed on older versions of Kafka Connect, the developer should use a try catch block to catch the NoSuchMethodError or NoClassDefFoundError thrown by worker with an older version of AK, but it should be clearly documented that they would not be compatible with topic-mutating SMTs in those environmentswith newer Connect runtimes with this change since the new SinkTask::put method has a default implementation which calls the older SinkTask::put method. This proposal is also backward compatible such that updated connectors using the new functionality from this KIP will continue working with older Connect runtimes because the existing SinkTask::put method is abstract and has to be implemented (the Javadoc will be updated to reflect how this method should be implemented in case the newer SinkTask::put method is overridden - shown in the Public Interfaces section above). While these updated connectors will continue working with older Connect runtimes, they should document that topic mutating SMTs cannot be used with the connector if deployed on older Connect runtimes.


Rejected Alternatives

  1. Address the problem entirely within the framework, doing some kind of mapping from the transformed topic back to the original topic.

    • This would only work in the cases where there’s no overlap between the transformed topic names, but would break for the rest of the transformations (e.g. static transformation, topic = “a”).

    • Even if we wanted to limit the support to those cases, it would require considerable bookkeeping to add a validation to verify that the transformation chain adheres to that expectation (and fail fast if it doesn’t).

  2. Expose a new method in SinkRecord which returns the original topic and partition
    • This will require connector developers to use a try-catch around the new method calls to ensure that the connector is backward compatible with older Connect runtimes (where the new method won't be available). This is a pretty non-standard approach for most developers and we should try to keep this as a last resort.
  3. Expose the entire original record through SinkRecord instead of only topic/partition (e.g. originalSinkRecord)

    • We should not expose the original value/key, transformations might be editing them for security reasons.

  4. Create a method in the SinkTaskContext to get this information, instead of updating SinkRecord (e.g. SinkTaskContext.::getOriginalTopic(SinkRecord sr) / SinkTaskContext.::getOriginalKafkaPartition(SinkRecord sr)

    • Requires extra bookkeeping without concrete value.

    Update SinkTask.put in any way to pass the new information outside SinkRecord (e.g. a Map or a derived class)

    • Much more disruptive change without considerable prosThis has the same limitations w.r.t. requiring the use of ugly try-catch logic around methods which may or may not exist to ensure backward compatibility of connectors with older Connect runtimes.