Status

Current stateAccepted

Discussion thread: here

JIRA: KAFKA-4161 

Released: 0.10.2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is with regard to sink connectors and the offset commit process managed by the Connect runtime.

Periodic offset commits (controlled with offset.flush.interval.ms) require knowing what offset state is safe to commit to ensure at-least-once delivery from Kafka to the sink connector. 

With the current API guarantees that are available, after SinkTask.flush() and before any further put() calls the current offset state is safe to commit. So this is what the runtime relies upon, and connectors have an expectation of periodic calls to SinkTask.flush().

However, it is not ideal to couple offset commits and flushes:

There are some additional use-cases that this proposal aims to address:

Public Interfaces and Proposed Changes

New method on SinkTask abstract base class:

/**
 * Pre-commit hook invoked prior to an offset commit.
 *
 * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable.
 *
 * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
 *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
 *                       passed to {@link #put}.
 *
 * @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition.
 */
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    flush(currentOffsets);
    return currentOffsets;
}

New method on SinkTaskContext interface:

/**
 * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
 * by requesting an offset commit as soon as they flush data to the destination system.
 *
 * This is a hint to the runtime and no timing guarantee should be assumed.
 */
void requestCommit();


Semantics

The motivating use-cases are met as follows:

Compatibility, Deprecation, and Migration Plan


While preCommit() is a new method on the SinkTask API, it has a default implementation that calls flush(), so there are no compatibility issues.

There are no deprecations and no migration is needed, however some connectors can consider overriding preCommit() to avoid unnecessary flushes or disable offset commits.

Rejected Alternatives