Status
Current state: "Under Discussion"
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
There's currently an incompatibility between Sink connectors overriding the SinkTask.preCommit
method (for asynchronous processing) and SMTs that mutate the topic field.
The problem was present since the preCommit
method inception and is rooted in a mismatch between the topic/partition that is passed to open/preCommit
(the original topic and partition before applying any transformations) and the topic partition that is present in the SinkRecord that the SinkTask.put
method receives (after transformations are applied). Since that's all the information the connector has to implement any kind of internal offset tracking, the topic/partitions it can return in preCommit will correspond to the transformed topic, when the framework actually expects it to be the original topic.
In SinkTask.preCommit
. For the others, we acknowledge that "broader API changes are required".
Public Interfaces
org.apache.kafka.connect.sink.SinkRecord
Add a new field, with its corresponding getter and constructor:
private final TopicPartition originalTopicPartition; ... public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Long timestamp, TimestampType timestampType, Iterable<Header> headers, TopicPartition originalTopicPartition) { ... /** * @return topic and partition corresponding to the kafka record before transformations were applied. This is * necessary for internal offset tracking, to be compatible with SMTs that mutate the topic name. */ public TopicPartition originalTopicPartition() { return originalTopicPartition; }
Proposed Changes
Expose the original topic/partition in SinkRecord
and ask Sink Connectors to use that information for offset tracking purposes.
Compatibility, Deprecation, and Migration Plan
Backwards Compatibility
This proposal is backward compatible such that existing sink connector implementations will continue to work as before.
Forward Compatibility
To ensure that new connectors using this new method and interface can still be deployed on older versions of Kafka Connect, the developer should use a try catch block to catch the NoSuchMethodError
or NoClassDefFoundError
thrown by worker with an older version of AK, but it should be clearly documented that they would not be compatible with topic-mutating SMTs in those environments.
Rejected Alternatives
Address the problem entirely within the framework, doing some kind of mapping from the transformed topic back to the original topic.
This would only work in the cases where there’s no overlap between the transformed topic names, but would break for the rest of the transformations (e.g. static transformation, topic = “a”).
Even if we wanted to limit the support to those cases, it would require considerable bookkeeping to add a validation to verify that the transformation chain adheres to that expectation (and fail fast if it doesn’t).
Expose the entire original record instead of only topic/partition (e.g. originalSinkRecord)
We should not expose the original value/key, transformations might be editing them for security reasons.
Create a method in the
SinkTaskContext
to get this information, instead of updatingSinkRecord
(e.g.SinkTaskContext.getOriginalTopicPartition(SinkRecord sr)
)Requires extra bookkeeping without concrete value.
Update
SinkTask.put
in any way to pass the new information outsideSinkRecord
(e.g. a Map or a derived class)Much more disruptive change without considerable pros