THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: KAFKA-4161
Released: 0.10.2.0
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
SinkTask
s have the most knowledge about their buffer state, so giving them more control over when they should flush rather than imposing it via the API makes sense.- Some may not buffer at all and flush to the destination system all the records provided to
put()
. - Some may buffer time-based, whether via the current periodic flushes, or a background thread if they need more control over it.
- Some may buffer size-based and write records to the destination system / make them visible when a temporary output file gets full or in-memory data structure reaches a certain size.
- Many will want a combination of time and size-based - size as hard-limit and time for liveness.
- Some may not buffer at all and flush to the destination system all the records provided to
- Flushing buffers unnecessarily when one of the desired conditions has not been met should be rare, as it can hurt throughput.
...
- Some connectors that store offset state in the destination system, may not wish for Connect to manage offset commits at all as it is wasted effort.
- To minimize the window of at-least-once delivery semantics redelivery upon failure, commits should ideally follow soon after a flush by the sink. So it would be useful to have a mechanism for the connector to explicitly request an offset commit.
...
- When checking whether an offset commit is 'due' as per
offset.flush.interval.ms
, any pending commit request from the connector viaSinkTaskContext.requestCommit()
is also taken into consideration. Starting a commit clears any such pending request. - Instead of invoking
SinkTask.flush()
as part of the offset commit process,SinkTask.preCommit()
is invoked and the returned offset state committed.SinkTask.flush()
is invoked directly by the runtime only prior to SinkTask.close().
The motivating use-cases are met as follows:
- Connectors that need to keep relying on periodic flushes don't need to do anything as
preCommit()
will invokeflush()
by default.Connectors , however connectors that would like to flush data based on custom policies like number of records or serialized size can do so by overridingpreCommit()
. They would need to maintain 'committable' offset state in aMap
internally that they can return frompreCommit()
. They are free to optionally rely on the periodic calls topreCommit()
for flushing data when e.g. a liveness timeout is hit. - Connectors that wish to disable benefit from disabling Connect-managed offset commits can override
preCommit()
and return an empty map. - Connectors that want to minimize redelivery after failures can
requestCommit()
after a flush.
Compatibility, Deprecation, and Migration Plan
...