Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-5567
the problem was fixed for connectors that don't override  SinkTask::preCommit. For the others, it was acknowledged that "broader API changes are required". 

Another issue that topic mutating SMTs cause is with the SinkTask::open method. This method is called by the framework when topic partitions are assigned to the task's underlying Kafka consumer (via consumer group rebalances) in order to allow sink task implementations to create any required topic partition specific writers and other resources (before records from these topic partitions are sent to the task via SinkTask::put). However, sink tasks might fail in unexpected ways when used with topic-mutating SMTs because they could receive records via SinkTask::put whose topics (obtained via  SinkRecord::topic) might not have been "opened".

Public Interfaces

org.apache.kafka.connect.sink.SinkRecord

...

Code Block
languagejava
titleSinkRecord
    private final String originalTopic;
    private final Integer originalKafkaPartition;
    private final long originalKafkaOffset;
...
    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                      Long timestamp, TimestampType timestampType, Iterable<Header> headers, String originalTopic, Integer originalKafkaPartition, long originalKafkaOffset)
...     


    /**
     * @return the topic corresponding to the Kafka record before any transformations were applied. This should be
     * used for any internal offset tracking purposes rather than {@link #topic()}, in order to be compatible 
     * with SMTs that mutate the topic name.
     */
    public String originalTopic() {
        return originalTopic;
    }

    /**
     * @return the topic partition corresponding to the Kafka record before any transformations were applied. This
     * should be used for any internal offset tracking purposes rather than {@link #kafkaPartition()}, in order to be 
     * compatible with SMTs that mutate the topic partition.
     */
    public Integer originalKafkaPartition() {
        return originalKafkaPartition;
    }

    /**
     * @return the offset corresponding to the Kafka record before any transformations were applied. This
     * should be used for any internal offset tracking purposes rather than {@link #kafkaOffset()}, in order to be 
     * compatible with SMTs that mutate the offset value.
     */
    public long originalKafkaOffset() {
        return originalKafkaOffset;
    }

org.apache.kafka.connect.sink.SinkTask

Add new overloaded SinkTask::open / SinkTask::close methods via which the framework can provide tasks with sets of both pre-transform as well as post-transform topic partitions to be opened / closed.

Code Block
languagejava
titleSinkTask
    /**
     * The sink task uses this method to create writers and other required resources for newly assigned topic partitions.
     * Pre-transform topic partitions refer to the original Kafka topic name and partition number before (potentially
     * topic-mutating) {@link Transformation}s are applied (i.e. corresponding to {@link SinkRecord#originalTopic()}
     * and {@link SinkRecord#originalKafkaPartition()}). Post-transform topic partitions refer to the topic name and
     * partition number after (potentially topic-mutating) {@link Transformation}s are applied (i.e. corresponding to
     * {@link SinkRecord#topic()} and {@link SinkRecord#kafkaPartition()}.
     *
     * @param preTransformTopicPartitions The list of pre-transform topic partitions that are being assigned to the task
     * @param postTransformTopicPartitions The list of post-transform topic partitions that are being assigned to the task
     */
    public void open(Collection<TopicPartition> preTransformTopicPartitions, Collection<TopicPartition> postTransformTopicPartitions) {

    }

    /**
     * The sink task uses this method to close writers and other relevant resources for topic partitions that are no
     * longer assigned to the task. After being closed, new records corresponding to the closed topic partitions won't
     * be sent via {@link #put(Collection)} until they have been re-opened.
     * Pre-transform topic partitions refer to the original Kafka topic name and partition number before (potentially
     * topic-mutating) {@link Transformation}s are applied (i.e. corresponding to {@link SinkRecord#originalTopic()}
     * and {@link SinkRecord#originalKafkaPartition()}). Post-transform topic partitions refer to the topic name and
     * partition number after (potentially topic-mutating) {@link Transformation}s are applied (i.e. corresponding to
     * {@link SinkRecord#topic()} and {@link SinkRecord#kafkaPartition()}.
     *
     * @param preTransformTopicPartitions The list of pre-transform topic partitions that should be closed
     * @param postTransformTopicPartitions The list of post-transform topic partitions that should be closed
     */
    public void close(Collection<TopicPartition> preTransformTopicPartitions, Collection<TopicPartition> postTransformTopicPartitions) {

    }


Proposed Changes

Expose the original (pre-transformation) Kafka topic, topic partition and offset via new SinkRecord public methods and ask sink task implementations to use that information for offset tracking purposes (i.e. corresponding to the topic partition offsets that the sink task returns to the framework via SinkTask::preCommit). Note that while the record's offset can't be modified via the standard SinkRecord::newRecord methods that SMTs are expected to use, SinkRecord has public constructors that would allow SMTs to return records with modified offsets. This is why the proposed changes include a new SinkRecord::originalKafkaOffset method as well.

Two new methods will be added to the SinkTask abstract class as well - overloaded SinkTask::open and SinkTask::close methods in order to allow sink task implementations to allocate and deallocate resources for post-transform topic partitions assigned to them. This is important because topic-mutating SMTs exist to essentially modify the source topic name from the point of view of the sink connector and the post-transform topic is what should be used by sink connectors when writing the data to the sink system (whereas the pre-transform topic partition is important from an offset tracking and reporting standpoint). The Connect framework will provide no guarantees about the exact times that these new methods will be called except that SinkTask::open will be called for a topic partition before any records from the topic partition are sent for processing via SinkTask::put and that SinkTask::close will be eventually called for topic partitions that are no longer active. One strategy that the framework might take could be to use a cache with a time based eviction policy to periodically "close" topic partitions that are not being used. Since the framework doesn't have knowledge of specific SMT implementations, it cannot deterministically determine when a post-transform topic partition is "truly" closed.


Compatibility, Deprecation, and Migration Plan

The existing SinkTask::open and SinkTask::close methods will be marked as deprecated because going forward all sink task implementations can get all the needed information from the new SinkTask::open and SinkTask::close methods.

...

Old / existing sink connector plugins running on a new Kafka Connect version

Old / existing sink connector plugins will continue running as expected on new versions of Kafka Connect because the new SinkTask::open and SinkTask::close methods will have default no-op implementations and the existing SinkTask::open and SinkTask::close methods will continue to be called after consumer group rebalances. However, these older sink connectors will continue to be but they will continue to remain incompatible with topic-mutating SMTs even on newer versions of Kafka Connect.

...

New / updated sink connector plugins running on an older Kafka Connect version

To ensure that connectors using the new SinkRecord getter methods can still be deployed on older versions of Kafka Connect, the developer should use a try catch block should be used to catch the and handle NoSuchMethodError or NoClassDefFoundError thrown by Connect workers running an older version of AK. Sink connectors should also continue to implement the older SinkTask::open and SinkTask::close methods (possibly by delegating to their implementations of the new SinkTask::open and SinkTask::close methods with any empty collection of preTransformTopicPartitions NoClassDefFoundError (also see KIP-610: Error Reporting in Sink Connectors for reference which had the same issue). However, it should be clearly documented that they the connector would not be compatible with topic/partition/offset mutating SMTs when deployed on older versions of Kafka Connect.

...

  1. Address the offsets problem entirely within the framework, doing some kind of mapping from the transformed topic back to the original topic.

    • This would only work in the cases where there’s no overlap between the transformed topic names, but would break for the rest of the transformations (e.g. static transformation, topic = “a”).

    • Even if we wanted to limit the support to those cases, it would require considerable bookkeeping to add a validation to verify that the transformation chain adheres to that expectation (and fail fast if it doesn’t).

  2. Expose the entire original record instead of only topic / partition / offset (e.g. originalSinkRecord)

    • We should not expose the original value / key, transformations might be editing them for security reasons.

  3. Create a method in the SinkTaskContext to get the original topic / partition / offset information, instead of updating SinkRecord (e.g. SinkTaskContext.getOriginalTopic(SinkRecord sr) / SinkTaskContext.getOriginalKafkaPartition(SinkRecord sr)

    • Requires extra bookkeeping without concrete value.

  4. Update SinkTask::put in any way to pass the new information outside SinkRecord (e.g. a Map or a derived class)

    • Much more disruptive change without considerable pros

  5. Use record headers to expose the original topic / partition / offset instead of adding new methods to SinkRecord
    • While this gets around the need to handle NoSuchMethodErrorNoClassDefFoundError in sink connectors that want to retain compatibility with older Connect runtimes, it is arguably even less intuitive from a connector developer's standpoint.
  6. Add new overloaded SinkTask::open and SinkTask::close methods which include both pre-transform and post-transform topic partitions so that connectors can create and destroy resources even if they choose to use post-transform topic partitions to write to the sink system
    • This requires sink connector plugins to implement two new additional methods without significant benefit. The information from the existing open / close methods on pre-transform topic partitions can be combined with the per record information of both pre-transform and post-transform topic partitions to handle most practical use cases without significantly muddying the sink connector related public interfaces.
  7. Keep the existing SinkTask::open and SinkTask::close methods but call them with post-transform rather than pre-transform topic partitions
    • This would break backward compatibility since there could exist connector plugins that are aware of the current limitations (i.e. that SinkTask::open / SinkTask::close are called for pre-transform topic partitions but records in SinkTask::put only currently contain their post-transform topic partition) and work around them. Also, pre-transform / original topic partitions and offsets are critical for any sink connectors that wish to implement exactly-once semantics by tracking offsets in the sink system rather than Kafka.