Status
Current state: Vote in progress
Discussion thread: here and and here
JIRA:
Pull Request: https://github.com/apache/kafka/pull/6171 replaced with: https://github.com/apache/kafka/pull/6295
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
MirrorMaker 2.0 (KIP-382) needs to know the downstream offsets of replicated records in order to provide cross-cluster offset translation. Currently, WorkerSourceTask receives this information from KafkaProducer but throws it away. It's possible that other Connectors may benefit from this change, e.g. see KIP-381, which also proposes to notify SourceTasks of ACK'd records. In particular, this proposal makes it possible to distinguish when records have been durably stored vs when they have been skipped altogether by a SourceConnector.
The callback commitRecord() will be overloaded with an extra parameter:
public abstract class SourceTask implements Task { ---%<--- // existing method public void commitRecord(SourceRecord sourceRecord) { // nop } // new method public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) { // nop } ---%<--- |
Currently, SourceTask includes a commitRecord() callback, which is invoked under these conditions:
The new overloaded version will be invoked only when a record is ACK'd, which implies the record was not filtered and was not skipped. This is somewhat intuitive, as the RecordMetadata can only be filled in by a producer ACK.
After an ACK, WorkerSourceTask currently logs MetadataRecord.offset() and partition() before invoking the commitRecord() callback. I propose to also invoke the new overloaded version as well:
task.commitRecord(preTransformRecord); if (recordMetadata != null) task.commitRecord(preTransformRecord, recordMetadata); |
This is a new callback and won't affect existing code beyond an additional no-op function call.