Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

org.apache.kafka.connect.sink.SinkTask

Add new overloaded SinkTask::open / SinkTask::close methods via which the framework can provide tasks with sets of both pre-transform as well as post-transform topic partitions to be opened / closed.

Code Block
languagejava
titleSinkTask
    /**
     * The sink task uses this method to create writers and other required resources for newly assigned topic partitions.
     * Pre-transform topic partitions refer to the original Kafka topic name and partition number before (potentially
     * topic-mutating) {@link Transformation}s are applied (i.e. corresponding to {@link SinkRecord#originalTopic()}
     * and {@link SinkRecord#originalKafkaPartition()}). Post-transform topic partitions refer to the topic name and
     * partition number after (potentially topic-mutating) {@link Transformation}s are applied (i.e. corresponding to
     * {@link SinkRecord#topic()} and {@link SinkRecord#kafkaPartition()}.
     *
     * @param preTransformTopicPartitions The list of pre-transform topic partitions that are being assigned to the task
     * @param postTransformTopicPartitions The list of post-transform topic partitions that are being assigned to the task
     */
    public void open(Collection<TopicPartition> preTransformTopicPartitions, Collection<TopicPartition> postTransformTopicPartitions) {

    }

    /**
     * The sink task uses this method to close writers and other relevant resources for topic partitions that are no
     * longer assigned to the task. After being closed, new records corresponding to the closed topic partitions won't
     * be sent via {@link #put(Collection)} until they have been re-opened.
     * Pre-transform topic partitions refer to the original Kafka topic name and partition number before (potentially
     * topic-mutating) {@link Transformation}s are applied (i.e. corresponding to {@link SinkRecord#originalTopic()}
     * and {@link SinkRecord#originalKafkaPartition()}). Post-transform topic partitions refer to the topic name and
     * partition number after (potentially topic-mutating) {@link Transformation}s are applied (i.e. corresponding to
     * {@link SinkRecord#topic()} and {@link SinkRecord#kafkaPartition()}.
     *
     * @param preTransformTopicPartitions The list of pre-transform topic partitions that should be closed
     * @param postTransformTopicPartitions The list of post-transform topic partitions that should be closed
     */
    public void close(Collection<TopicPartition> preTransformTopicPartitions, Collection<TopicPartition> postTransformTopicPartitions) {

    }

...