Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-4161
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is with regard to sink connectors and the offset commit process managed by the Connect runtime.
Periodic offset commits (controlled with offset.flush.interval.ms
) require knowing what offset state is safe to commit to ensure at-least-once delivery from Kafka to the sink connector.
With the current API guarantees that are available, after SinkTask.flush()
and before any further put()
calls the current offset state is safe to commit. So this is what the runtime relies upon, and connectors have an expectation of periodic calls to SinkTask.flush().
However, it is not ideal to couple offset commits and flushes:
SinkTask
s have the most knowledge about their buffer state, so giving them more control over when they should flush rather than imposing it via the API makes sense.- Some may not buffer at all and flush to the destination system all the records provided to
put()
. - Some may buffer time-based, whether via the current periodic flushes, or a background thread if they need more control over it.
- Some may buffer size-based and write records to the destination system / make them visible when a temporary output file gets full or in-memory data structure reaches a certain size.
- Many will want a combination of time and size-based - size as hard-limit and time for liveness.
- Some may not buffer at all and flush to the destination system all the records provided to
- Flushing buffers unnecessarily when one of the desired conditions has not been met should be rare, as it can hurt throughput.
There are some additional use-cases that this proposal aims to address:
- Some connectors that store offset state in the destination system, may not wish for Connect to manage offset commits at all as it is wasted effort.
- To minimize the window of at-least-once delivery semantics upon failure, commits should ideally follow soon after a flush by the sink. So it would be useful to have a mechanism for the connector to explicitly request an offset commit.
Public Interfaces and Proposed Changes
New method on SinkTask abstract base class:
/** * Pre-commit hook invoked prior to an offset commit. * * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable. * * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}}, * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s * passed to {@link #put}. * * @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition. */ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { flush(currentOffsets); return currentOffsets; }
New method on SinkTaskContext interface:
/** * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery * by requesting an offset commit as soon as they flush data to the destination system. * * This is a hint to the runtime and no timing guarantee should be assumed. */ void requestCommit();
Semantics
- When checking whether an offset commit is 'due' as per
offset.flush.interval.ms
, any pending commit request from the connector viaSinkTaskContext.requestCommit()
is also taken into consideration. Starting a commit clears any such pending request. - Instead of invoking
SinkTask.flush()
as part of the offset commit process,SinkTask.preCommit()
is invoked and the returned offset state committed. SinkTask.flush()
is invoked directly by the runtime only prior toSinkTask.close()
.
The motivating use-cases are met as follows:
- Connectors that need to keep relying on periodic flushes don't need to do anything as
preCommit()
will invokeflush()
by default. - Connectors that would like to flush data based on custom policies like number of records or serialized size can do so by overriding
preCommit()
. They would need to maintain 'committable' offset state in aMap
internally that they can return frompreCommit()
. They are free to optionally rely on the periodic calls topreCommit()
for flushing data when e.g. a liveness timeout is hit. - Connectors that wish to disable Connect-managed offset commits can override
preCommit()
and return an empty map.
Compatibility, Deprecation, and Migration Plan
While preCommit()
is a new method on the SinkTask
API, it has a default implementation that calls flush(),
so there are no compatibility issues.
There are no deprecations and no migration is needed, however some connectors can consider overriding preCommit()
to avoid unnecessary flushes or disable offset commits.
Rejected Alternatives
- Deprecate
flush()
altogether: if we were starting from a clean slate,flush()
is not strictly necessary. Connectors could choose to perform flushes fromSinkTask.preCommit()
and also uponSinkTask.close()
.- However,
SinkTask.flush()
is now implemented by a number of connectors and can be expected to be invoked periodically as well as prior toSinkTask.close()
.
- However,
- Avoid adding the new
preCommit()
method on theSinkTask
API, but haveSinkTaskContext.requestCommit()
accept the offset state as an argument. Then could allow runtime to just use whatever offsets it has been given and skipflush()
as long as the connector has been providing them.- Offset commits are asynchronous to
requestCommit()
, and it is desirable to use the most-recent committable offset state. - The implication of disabling periodic flushes based on offset commit requests from the connector is not very obvious.
- Not clear how the use-case of disabling offset commits should be addressed.
- Offset commits are asynchronous to