Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:   [One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: No jira.here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently, there are 2 approaches to use the Kafka java client correctly, that is, in a way that does not cause record duplication during normal operation. (Duplicates are still possible due to failures.)

Approach 1 - process all records immediately (synchronous)

This approach is useful when all processing can happen between the calls to poll.

...

  1. a poll loop which:
    1. calls consumer.poll
    2. process the received records
    3. commits offsets for the just processed records
    4. repeat

The big advantage of this approach is that it is the simplest. This is also the only approach that can be used with auto-commit.

However, this approach has the downside that asynchronous processing can not be used. Approach 2 does allow thisTherefore, we do not consider it further in this KIP.

Approach 2 - asynchronous processing

...

  1. a poll loop which:
    1. calls consumer.poll (note: rebalance listener might be called during poll)
    2. start processing received records
    3. commits offsets for records that completed processing
    4. repeat
  2. a rebalance listener. When partitions are revoked it:
    1. waits (blocking) until processing of all consumed records (from revoked partitions) has completed
    2. when the poll-loop commits (step 1.c.) were async, wait for these commits to complete
    3. synchronously commits offsets for records that completed processing

...

In the new approach, when a revoke is requested in one poll, the revoke will be completed in the next poll.

When the call to a poll invocation completes and a partition revoke is requested, the user has a choice:

...

To allow a program to stop committing offsets for a lost partition, a new consumer method is introduced that gives the partitions that were lost in the previous poll. This method is used in step 1.e. Note, ideally, this does not happen because the user configures the partition assigner such that a partition is declared lost only after the maximum processing time has passed (this is outside the scope of this KIP).

...

There is a small chance we break clients that depend on a revoke to be completed in the same poll as it was requested. If this is a problem, a configuration could be introduced that restores the current behavior. See also Another option to mitigate this is described in variant 1 below.

When this KIP is implemented, the rebalance listener is no longer needed. However, it is assumed that it will continue to be supported for backward compatibility. Therefore, no deprecation or migration is needed.

...

Instead of introducing methods getToBeRevokedPartitions and getLostPartitions, it is also possible to introduce a new poll method that returns all of the following: the new records, the partitions that will be revoked in the next call to poll, the partitions that were just lost. Because this is a separate poll method, it is possible to keep partition revoke behavior of the current poll method exactly the same (which is to complete partition revoke in the same poll). This prevents any backward compatibility problems at the cost of potential user confusion due there being 2 poll methods.

Rejected alternatives

None.