Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

org.apache.kafka.connect.sink.SinkRecord

Add a two new fieldfields, with its their corresponding getter getters and constructor:

Code Block
languagejava
titleSinkRecord
    private final String originalTopic;
    private final TopicPartitionInteger originalTopicPartitionoriginalKafkaPartition;
...
    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                      Long timestamp, TimestampType timestampType, Iterable<Header> headers, TopicPartition originalTopicPartition String originalTopic, int originalKafkaPartition) {
...
    /**
     * @return topic corresponding to the Kafka record before transformations were applied. This is
     * necessary for internal offset tracking, to be compatible with SMTs that mutate the topic and name.
     */
    public String originalTopic() {
        return originalTopic;
    }

    /**
     * @return topic partition corresponding to the kafkaKafka record before transformations were applied. This is
     * necessary for internal offset tracking, to be compatible with SMTs that mutate the topic name.
     */
    public TopicPartitionString originalTopicPartitionoriginalKafkaPartition() {
        return originalTopicPartitionoriginalKafkaPartition;
    }


Proposed Changes

Expose the original topic/partition in SinkRecord and ask Sink Connectors to use that information for offset tracking purposes.

...

  1. Address the problem entirely within the framework, doing some kind of mapping from the transformed topic back to the original topic.

    1. This would only work in the cases where there’s no overlap between the transformed topic names, but would break for the rest of the transformations (e.g. static transformation, topic = “a”).

    2. Even if we wanted to limit the support to those cases, it would require considerable bookkeeping to add a validation to verify that the transformation chain adheres to that expectation (and fail fast if it doesn’t).

  2. Expose the entire original record instead of only topic/partition (e.g. originalSinkRecord)

    • We should not expose the original value/key, transformations might be editing them for security reasons.

  3. Create a method in the SinkTaskContext to get this information, instead of updating SinkRecord (e.g. SinkTaskContext.getOriginalTopicPartitiongetOriginalTopic(SinkRecord sr) / SinkTaskContext.getOriginalKafkaPartition(SinkRecord sr)

    • Requires extra bookkeeping without concrete value.

  4. Update SinkTask.put in any way to pass the new information outside SinkRecord (e.g. a Map or a derived class)

    • Much more disruptive change without considerable pros