Current state: Under Discussion
Discussion thread: link
JIRA: KAFKA-5337
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Under certain circumstances, the existing RangeAssignor
and RoundRobinAssignor
partition assignment strategies fail to produce a balanced assignment of partitions to consumers, in terms of the total lag assigned to each consumer. The above issue can be overcome by a new partition assignment that assigns partitions such that lag is distributed as evenly as possible across a consumer group.
This change would introduce a new partition assignment strategy, implemented as a new class LagAwareAssignor
that can be used by specifying org.apache.kafka.clients.consumer.LagAwareAssigno
r for the value of the consumer property partition.assignment.strategy
. The default value of this consumer property would not be changed.
Add a Lag-Aware Assignor option to the assignment strategies of the new consumer. The Lag-Aware Assignor operates on a per-topic basis and serves two purposes.
First, it guarantees an assignment that is as balanced as possible in terms of topic partition count, meaning either:
Second, during a reassignment the Lag-Aware Assignor would perform the reassignment in such a way that in the new assignment,
The first goal above takes precedence over the second one. This means that the distribution of total lags assigned to each consumer may still be uneven, but will be as even as possible for an assignment that is balanced in terms of topic partition count. Ensuring that assignments are balanced by topic partition count serves two purposes:
RangeAssignor)
The total lag assigned to each consumer is the sum of the lags across all assigned topic partitions. The lag on each topic partition is the difference between the offset of the next message to be published to the partition and the last offset committed by the consumer group for that partition.
The inputs to the lag-aware partition assignment algorithm are:
The lag-aware partition assignment strategy maintains a number of data structures:
The lag-aware partition assignment algorithm operates on a single topic at a time, within the context of a single consumer group, the following additional data structures are maintained during assignment of each topic:
The algorithms includes the following main steps:
auto.offset.reset
consumer property, as follows:auto.offset.reset=latest
, the current lag for the partition is 0auto.offset.reset=earliest
(or any other value) we assume a current lag equal to the total number of records currently available in that partition (i.e. latest offset - beginning offset)A means of obtaining the earliest, latest and last committed offset for each topic partition is required.
The prototype prototype implementation achieves this by creating an additional KafkaConsumer
joined to the consumer group for which assignment is being performed. Offsets are then obtained via the Consumer API.
The prototype targeted Kafka 0.10.2.0. In Kafka 0.11.0.0 it may be more appropriate to obtain offsets via the new Admin API.
A prototype implementation targeting Kafka 0.10.2.0 is available here.
Suppose a consumer group contains two consumers C0
and C1
, both subscribed to a topic t0
with 3 partitions: t0p0
(lag 100,000), t0p1
(lag 60,000) and t0p2
(lag 50,000).
The assignment generated by both RangeAssignor
and RoundRobinAssignor
will be:
Consumer | Assigned Topic Partitions | Total Assigned Lag |
---|---|---|
C0 | t0p0 , t0p1 | 160,000 |
C1 | t0p2 | 50,000 |
The assignment generated by the proposed lag-aware assignor will be:
Consumer | Assigned Topic Partitions | Total Assigned Lag |
---|---|---|
C0 | t0p0 | 100,000 |
C1 | t0p1 ,t0p2 | 110,000 |
This change will have no impact on existing users unless the explicitly configure their consumers to use the new partition assignment strategy.