...
Current state: Under Discussion
Discussion thread: here link TODO
JIRA: KAFKA-5337
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- For each topic present in the subscription for at least one consumer, determine the current lag on each partition for the consumer group
- Lag is computed by subtracting the latest offset from the last committed offset for each partition
- If the consumer group has not yet committed an offset for a partition, determine the lag based on the
auto.offset.reset
consumer property, as follows:- If
auto.offset.reset=latest
, the current lag for the partition is 0 - If
auto.offset.reset=earliest
(or any other value) we assume a current lag equal to the total number of records currently available in that partition (i.e. latest offset - beginning offset)
- If
- For each topic present in the subscription:
- Sort partitions in order of decreasing lag
- While there are still unassigned partitions:
- Select the unassigned partition with the maximum lag
- Assign this partition to the consumer with the minimum total number of assigned partitions
- If all consumers have an equal number of partitions assigned, assign the partition to the consumer with the minimum total assigned lag
Obtaining
...
Consumer Group Offsets
A means of obtaining the earliest, latest and last committed offset for each topic partition is required.
...
The prototype targeted Kafka 0.10.2.0. In Kafka 0.11.0.0 it may be more appropriate to obtain offsets via the new Admin API.
Prototype Implementation
A prototype implementation targeting Kafka 0.10.2.0 is available here.
Example
- TODO: Lag-aware assignment worked example (Grant Neale)
...
Suppose a consumer group contains two consumers C0
and C1
, both subscribed to a topic t0
with 3 partitions: t0p0
(lag 100,000), t0p1
(lag 60,000) and t0p2
(lag 50,000).
The assignment generated by both RangeAssignor
and RoundRobinAssignor
will be:
Consumer | Assigned Topic Partitions | Total Assigned Lag |
---|---|---|
C0 | t0p0 , t0p1 | 160,000 |
C1 | t0p2 | 50,000 |
The assignment generated by the proposed lag-aware assignor will be:
Consumer | Assigned Topic Partitions | Total Assigned Lag |
---|---|---|
C0 | t0p0 | 100,000 |
C1 | t0p1 ,t0p2 | 110,000 |
Compatibility, Deprecation, and Migration Plan
...
This change will have no impact on existing users unless the explicitly configure their consumers to use the new partition assignment strategy.
Rejected Alternatives
- Performing assignment solely based on total lag assigned to each consumer, without enforcing an even distribution of partition counts across consumers. This will produce assignments that ultimately cause the distribution of lags across consumers to become unbalanced again over time
...