Status

Current state: Vote in progress

Discussion thread: here and  and here

JIRA:

Pull Request: https://github.com/apache/kafka/pull/6171  replaced with: https://github.com/apache/kafka/pull/6295 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

MirrorMaker 2.0 (KIP-382) needs to know the downstream offsets of replicated records in order to provide cross-cluster offset translation. Currently, WorkerSourceTask receives this information from KafkaProducer but throws it away. It's possible that other Connectors may benefit from this change, e.g. see KIP-381, which also proposes to notify SourceTasks of ACK'd records. In particular, this proposal makes it possible to distinguish when records have been durably stored vs when they have been skipped altogether by a SourceConnector.

Public Interfaces

The callback commitRecord() will be overloaded with an extra parameter:


public abstract class SourceTask implements Task {
---%<---


// existing method
public void commitRecord(SourceRecord sourceRecord) {
  // nop
}


// new method
public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
  // nop
}
---%<---


Proposed Changes

Currently, SourceTask includes a commitRecord() callback, which is invoked under these conditions:

The new overloaded version will be invoked only when a record is ACK'd, which implies the record was not filtered and was not skipped. This is somewhat intuitive, as the RecordMetadata can only be filled in by a producer ACK.

After an ACK, WorkerSourceTask currently logs MetadataRecord.offset() and partition() before invoking the commitRecord() callback. I propose to also invoke the new overloaded version as well:


task.commitRecord(preTransformRecord);

if (recordMetadata != null)
    task.commitRecord(preTransformRecord, recordMetadata);


Compatibility, Deprecation, and Migration Plan

This is a new callback and won't affect existing code beyond an additional no-op function call. 

Rejected Alternatives