Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here link  TODO

JIRA: KAFKA-5337

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  1. For each topic present in the subscription for at least one consumer, determine the current lag on each partition for the consumer group
    1. Lag is computed by subtracting the latest offset from the last committed offset for each partition
    2. If the consumer group has not yet committed an offset for a partition, determine the lag based on the auto.offset.reset consumer property, as follows:
      1. If auto.offset.reset=latest, the current lag for the partition is 0
      2. If auto.offset.reset=earliest (or any other value) we assume a current lag equal to the total number of records currently available in that partition (i.e. latest offset - beginning offset)
  2. For each topic present in the subscription:
    1. Sort partitions in order of decreasing lag
    2. While there are still unassigned partitions:
      1. Select the unassigned partition with the maximum lag
      2. Assign this partition to the consumer with the minimum total number of assigned partitions
      3. If all consumers have an equal number of partitions assigned, assign the partition to the consumer with the minimum total assigned lag

Obtaining

...

Consumer Group Offsets

A means of obtaining the earliest, latest and last committed offset for each topic partition is required.

...

The prototype targeted Kafka 0.10.2.0.  In Kafka 0.11.0.0 it may be more appropriate to obtain offsets via the new Admin API.

Prototype Implementation

A prototype implementation targeting Kafka 0.10.2.0 is available here.

Example

  •  TODO: Lag-aware assignment worked example (Grant Neale)

...

Suppose a consumer group contains two consumers C0 and C1, both subscribed to a topic t0 with 3 partitions: t0p0 (lag 100,000), t0p1 (lag 60,000) and t0p2 (lag 50,000).

The assignment generated by both RangeAssignor and RoundRobinAssignor will be:

ConsumerAssigned Topic PartitionsTotal Assigned Lag
C0t0p0, t0p1160,000
C1t0p250,000


The assignment generated by the proposed lag-aware assignor will be:

ConsumerAssigned Topic PartitionsTotal Assigned Lag
C0t0p0100,000
C1t0p1,t0p2

110,000

Compatibility, Deprecation, and Migration Plan

...

This change will have no impact on existing users unless the explicitly configure their consumers to use the new partition assignment strategy.

Rejected Alternatives

  • Performing assignment solely based on total lag assigned to each consumer, without enforcing an even distribution of partition counts across consumers.  This will produce assignments that ultimately cause the distribution of lags across consumers to become unbalanced again over time

...