Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagebash
linenumberstrue
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–validate-regex <pattern>

Case Studies

All the examples shown in this chapter are based on the in-memory representation of the group coordinator.

Basic

Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.

...

  • Group (epoch=0)
    • Empty
  • Target Assignment (epoch=0)
    • Empty
  • Member Assignment
    • Empty

* The above format represents the group state as it is persisted in the __consumer_offsets topic.

Member A joins the group. The coordinator bumps the group epoch to 1, adds A to the group, and creates an empty member assignment.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=0)
    • Empty
  • Member Assignment
    • A - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment. All the partitions are assigned to A.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=0, partitions=[], pending-partitions=[]

When A heartbeats, the group coordinator transitions him to its target epoch/assignment because it does not have any partitions to revoke. The group coordinator updates the member assignment and replies with the new epoch 1 and all the partitions.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]

Member B joins the group. The coordinator adds the member to the group and bumps the group epoch to 2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=0, partitions=[], pending-partitions=[]

At this point B can transitions to epoch 2 but cannot get foo-2 until A revokes it.

Note that the persisted member assignment of B already includes foo-2 here but B does not get it yet because foo-2 is not free.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=2, partitions=[], pending-partitions=[foo-2]

When A heartbeats, the group coordinator instructs him to revoke foo-2.

When A heartbeats again and acknowledges the revocation, the group coordinator transitions him to epoch 2 and releases foo-2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]

When B heartbeats, he can now gets foo-2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]

Member C joins the group. The coordinator adds the member to the group and bumps the group epoch to 3.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

When B heartbeats, the group coordinator transitions him to epoch 3 because B has no partitions to revoke. It persists the change and reply. 

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=3, partitions=[foo-2], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

When C heartbeats, it transitions to epoch 3 but cannot get foo-1 yet. 

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=3, partitions=[foo-2], pending-partitions=[]
    • C - epoch=3, partitions=[], pending-partitions=[foo-1]

When A heartbeats, the group coordinator instructs him to revoke foo-1.

When A heartbeats again and acknowledges the revocation, the group coordinator transitions him to epoch 3 and releases foo-1.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=3, partitions=[foo-0], pending-partitions=[]
    • B - epoch=3, partitions=[foo-2], pending-partitions=[]
    • C - epoch=3, partitions=[foo-1], pending-partitions=[]

All the members have eventually advanced to the group epoch (3).

...

  • Group (epoch=21)
    • A
    • B
  • Target Assignment (epoch=21)
    • A - partitions=[foo-0, foo-1, foo-2]
    • B - partitions=[foo-3, foo-4, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]

C joins the group. The group coordinator adds him, bumps the group epoch, create the member assignment, and computes the target assignment.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

C heartbeats, the group coordinator transitions him to epoch 22 but does not yet give him any partitions because they are not revoked yet.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]
    • C - epoch=22, partitions=[], pending-partitions=[foo-2, foo-5]

A heartbeats, the group coordinator instructs him to revoke foo-2.

...

A heartbeats and acknowledges the revocation, the group coordinator transitions him to epoch 22, release foo-2, persists and reply.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1-1], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2], pending-partitions=[foo-5]

C heartbeats, the group coordinator gives him foo-2 which is now free but hold foo-5.

B heartbeats and acknowledges the revocation, the group coordinator transitions him to epoch 22, releases foo-5, persists and reply.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

C heartbeats, the group coordinator gives him foo-2 and foo-5.

...

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

A fails to heartbeat. The group coordinator removes him after the session timeout expires and bump the group epoch.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

The group coordinator computes the new target assignment.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=23)
    • B - partitions=[foo-3, foo-4, foo-0]
    • C - partitions=[foo-2, foo-5, foo-1]
  • Member Assignment
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

B and C heartbeat and transition to epoch 23.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=23)
    • B - partitions=[foo-3, foo-4, foo-0]
    • C - partitions=[foo-2, foo-5, foo-1]
  • Member Assignment
    • B - epoch=23, partitions=[foo-3, foo-4, foo-0], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-1], pending-partitions=[]

Partition Added

Let's start with a group with two members and one partition.

  • Group (epoch=22)
    • A
    • B
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0]
    • B - partitions=[]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0], pending-partitions=[]
    • B - epoch=22, partitions=[], pending-partitions=[]

A new partition foo-1 is created. The group coordinator detects it. It updates the group and bump the group epoch.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0]
    • B - partitions=[]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0], pending-partitions=[]
    • B - epoch=22, partitions=[], pending-partitions=[]

The group coordinator computes a new target assignment.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0]
    • B - partitions=[foo-1]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0], pending-partitions=[]
    • B - epoch=22, partitions=[], pending-partitions=[]

B and C heartbeat and transition to epoch 23.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0]
    • B - partitions=[foo-1]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0], pending-partitions=[]
    • B - epoch=23, partitions=[foo-1], pending-partitions=[]

Online Migration

We starts with a generic group.

  • Generic Group (generation=22)
    • A
    • B
    • C
  • Assignment
    • A - partitions=[foo-0, foo-1], pending-partitions=[]
    • B - partitions=[foo-3, foo-4], pending-partitions=[]
    • C - partitions=[foo-2, foo-5], pending-partitions=[]

A leaves and rejoins with the new protocol enabled. The group is converted. The current generation becomes the group epoch. The target assignment and the member assignments are created based on the current assignment.

  • Group (epoch=22)
    • A (upgraded)
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5-5], pending-partitions=[]

A uses the new protocol. B and C still use the old protocol.

...

  • Group (epoch=23)
    • A (upgraded)
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

The group coordinator computes a new target assignment and installs it. It also triggers a rebalance for C. 

  • Group (epoch=23)
    • A (upgraded)
    • C (PreparingRebalance)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

C heartbeats and is notified that a rebalance is required. C revokes all its partitions (assuming Eager protocol is used here) and sends a JoinGroup request.

...

  • Group (epoch=23)
    • A (upgraded)
    • C (CompletingRebalance)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

In the meantime, A heartbeats and transitions to epoch 23 as well.

  • Group (epoch=23)
    • A (upgraded)
    • C (CompletingRebalance)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

C sends the SyncGroup request and collects his new assignment. All partitions are given because they are all free. C transitions to Stable.

  • Group (epoch=23)
    • A (upgraded)
    • C (Stable)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

B rejoins the group with the new protocol. The group coordinator adds it and bumps the group epoch.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (Stable)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B A - epoch=0, partitions=[], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

The group coordinator computes a new target assignment. A rebalance is triggered for C to revoke foo-4.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (PreparingRebalance)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B - epoch=0, partitions=[], pending-partitions=[foo-3, foo-4]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

A heartbeats and he is told to revoke foo-3.

...

The group coordinator sees that C does not own any partitions any more, so it can transition to epoch 24 and transition to CompletingRebalance. foo-4 is released.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (CompletingRebalance)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B - epoch=24, partitions=[foo-3, 4], pending-partitions=[foo-43]
    • C - epoch=24, partitions=[foo-2, foo-5], pending-partitions=[]

C sends the SyncGroup request to collect its assignment. He transitions to Stable.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (Stable)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B - epoch=24, partitions=[foo-3, 4], pending-partitions=[foo-43]
    • C - epoch=24, partitions=[foo-2, foo-5], pending-partitions=[]

A heartbeats. He confirms the revocation of foo-3. He transitions to epoch 24. foo-3 is released.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (Stable)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=24, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=24, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=24, partitions=[foo-2, foo-5], pending-partitions=[]

B heartbeats and gets his assignment.

...