Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here

JIRA: KAFKA-4161 

Released: 0.10.2.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • SinkTasks have the most knowledge about their buffer state, so giving them more control over when they should flush rather than imposing it via the API makes sense.
    • Some may not buffer at all and flush to the destination system all the records provided to put().
    • Some may buffer time-based, whether via the current periodic flushes, or a background thread if they need more control over it.
    • Some may buffer size-based and write records to the destination system / make them visible when a temporary output file gets full or in-memory data structure reaches a certain size.
    • Many will want a combination of time and size-based - size as hard-limit and time for liveness.
  • Flushing buffers unnecessarily when one of the desired conditions has not been met should be rare, as it can hurt throughput.

...

  • Some connectors that store offset state in the destination system, may not wish for Connect to manage offset commits at all as it is wasted effort.
  • To minimize the window of at-least-once delivery semantics redelivery upon failure, commits should ideally follow soon after a flush by the sink. So it would be useful to have a mechanism for the connector to explicitly request an offset commit.

...

  • When checking whether an offset commit is 'due' as per offset.flush.interval.ms, any pending commit request from the connector via SinkTaskContext.requestCommit() is also taken into consideration. Starting a commit clears any such pending request.
  • Instead of invoking SinkTask.flush() as part of the offset commit process, SinkTask.preCommit() is invoked and the returned offset state committed.SinkTask.flush() is invoked directly by the runtime only prior to SinkTask.close().

The motivating use-cases are met as follows:

  • Connectors that need to keep relying on periodic flushes don't need to do anything as preCommit() will invoke flush() by default.Connectors , however connectors that would like to flush data based on custom policies like number of records or serialized size can do so by overriding preCommit(). They would need to maintain 'committable' offset state in a Map internally that they can return from preCommit(). They are free to optionally rely on the periodic calls to preCommit() for flushing data when e.g. a liveness timeout is hit.
  • Connectors that wish to disable benefit from disabling Connect-managed offset commits can override preCommit() and return an empty map.
  • Connectors that want to minimize redelivery after failures can requestCommit() after a flush.

Compatibility, Deprecation, and Migration Plan

...