Current state: Under Discussion
Discussion thread: here
JIRA: No jira.
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
This KIP has the goal to improve the java client consumer API such that:
Duplicate record handling happens when a partition is reassigned to another consumer, but the first consumer did not yet commit all offsets of processed records. When the second consumer starts, it starts reading at the last committed offset. However, since the first consumer did not yet commit all offsets, the second consumer will re-read some records that were already processed by the first consumer.
Hence record duplication.
Currently, there are 2 approaches to use the Kafka java client correctly, that is, in a way that does not cause record duplication during normal operation. (Duplicates are still possible due to failures.)
This approach is useful when all processing can happen between the calls to poll.
Clients that follow this approach have 1 component:
The big advantage of this approach is that it is the simplest. This is also the only approach that can be used with auto-commit.
However, this approach has the downside that asynchronous processing can not be used. Therefore, we do not consider it further in this KIP.
In this approach, processing can be done asynchronously while receiving more records. As soon as processing completes, the offset can be committed.
Clients that follow this approach have the following components:
This approach looks very flexible but has some problems:
Note that problems 4 and 5 are hard because both the poll-loop and the rebalance listener perform commits. This KIP does not solve problems 4 and 5, but it does allow for a simpler implementation.
This KIP proposes an alternative API for handling revoked partitions.
In this KIP we recognize that a 'revoke' operation consists out of two steps:
In addition, we recognize that a partition is lost when a consumer takes too long to release control of the partition.
Currently, when a revoke is requested (while a poll is in progress), the callback listener is immediately invoked, and the revoke is completed during the same poll.
In the new approach, when a revoke is requested in one poll, the revoke will be completed in the next poll.
When a poll invocation completes and a partition revoke is requested, the user has a choice:
Option b. is the interesting one and the core of this KIP; by quickly calling poll again, it is possible to continue processing records from partitions that are not revoked.
Clients that use the new approach (option b) have again only 1 component:
It is not possible to delay partition revocation indefinitely. The deadline is when the partition is lost. Once the partition is lost, the delay request is denied.
To allow a program to stop committing offsets for a lost partition, a new consumer method is introduced that gives the partitions that were lost in the previous poll. This method is used in step 1.e. Note, ideally, this does not happen because the user configures the partition assigner such that a partition is declared lost only after the maximum processing time has passed (this is outside the scope of this KIP).
Some smaller details:
The problems listed with approach 2 now disappear:
In addition, the API change is very small, existing clients do not need to be changed.
Three new methods need to be added to the consumer:
Set<TopicPartition> getToBeRevokedPartitions()
gives partitions that will be revoked in the next poll.Set<TopicPartition> getLostPartitions()
gives partitions that were lost during the last call to poll.bool delayRevoke(tps: Set<TopicPartition>)
request revoke for given partitions to be delayed to the poll following the next poll, returns true on success, false when some of these partitions were lost.This section shows how the extension can be implemented. This is not necessary exactly how this KIP will be implemented.
The consumer keeps:
During a consumer.poll:
Method getToBeRevokedPartitions
returns the revoke-next list.
Method getLostPartitions
returns the list of partitions that were lost during the last poll.
Method delayRevoke
moves the given partitions from the revoke-next list to the revoke-thereafter list.
There is a small chance we break clients that depend on a revoke to be completed in the same poll as it was requested. If this is a problem, a configuration could be introduced that restores the current behavior. Another option to mitigate this is described in variant 1 below.
When this KIP is implemented, the rebalance listener is no longer needed. However, it is assumed that it will continue to be supported for backward compatibility. Therefore, no deprecation or migration is needed.
When at some point in the future the rebalance listener has been deprecated and then removed, the same-thread lock in the consumer can be replaced by a simpler non-reentrant semaphore.
TBD.
Instead of introducing methods getToBeRevokedPartitions
and getLostPartitions
, it is also possible to introduce a new poll method that returns all of the following: the new records, the partitions that will be revoked in the next call to poll, the partitions that were just lost. Because this is a separate poll method, it is possible to keep partition revoke behavior of the current poll method exactly the same (which is to complete partition revoke in the same poll). This prevents any backward compatibility problems at the cost of potential user confusion due there being 2 poll methods.
None.