Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },
                        { "name": "VersionReason", "versions": "0+", "type": "int16int8" },
            			{ "name": "ReasonVersion", "versions": "0+", "type": "int8int16" },
             { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}

...

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

group.remote.assignorstringuniformThe server side assignor to use. It cannot be used in conjunction with group.local.assignor.
group.local.assignorslistemptyThe list of client side (local) assignors. It cannot be used in conjunction with group.remote.assignor.

Streams

...

Member Metadata and Assignment Metadata

TODO

The changes here are mainly informative at this stage. They show how we could structure the Streams' metadata. We may decide to leverage this change to do more changes.

Member Metadata Schema

This is the schema of the metadata advertised by each member.

NameTypeDoc
ProcessId

name

type

note

processId
uuid / static

Inherited. In the future we may also remove this field when one instance only use one consumer

userEndPoint
Identity of the instance that may have multiple consumers.
UserEndPointbytes / static

Inherited

clientTags

Used for cross-client communication.

ClientTagsmap / static

Inherited

errorCode

int8 / dynamic

Inherited and enhanced. Communicate rebalance reasons, possible values:

  • None
  • Shutdown
  • Warm-up ready
  • Warm-up failed
  • Requested by coordinator
  • Topology changed
  • [more?]

topologyHash

uuid / dynamic

Only updatable when errorCode is not “none”.

taskLag

array / dynamic

Only updatable when errorCode is not “none”.

name

type

note

activeTasks

list

Inherited

standbyTasks

map

Modified, only contain normal standby tasks

warmupTasks

map

New, warm-up standby tasks

partitionsByHost

map

Merged, global assignment information used for IQ

errorCode

int8

Inherited, possible values:

  • None
  • Shutdown
  • AssignmentError
  • InconsistentTopology
  • [more?]

Used for rack-aware assignment algorithm.

TopologyHashuuid / dynamicOnly updatable when reason is not zero.
TaskLagarray / dynamicOnly updatable when reason is not zero.

Member Metadata Reasons

  • None (0)
  • Shutdown (1)
  • WarmUpReady (2)
  • WarmUpFailed (3)
  • TopologyChanged (4)

Assignment Metadata Schema

NameTypeDoc
ActiveTaskslist

Local assignment for this consumer.

StandbyTasksmap

Local standby tasks for this consumer.

WarmupTasksmapLocal warming up tasks for this consumer.
PartitionsByHostmapGlobal assignment information used for IQ.

Assignment Metadata Errors

  • None (0)
  • Shutdown (1)
  • AssignmentError (2)
  • InconsistentTopology (3)

Streams Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

...

Code Block
languagebash
linenumberstrue
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–list -–type <comma separated list of types>

kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–describe -–type <comma separated list of types>

Case Studies

Basic

Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.

Let’s start with an empty group:

  • Group (epoch=0)
    • No membersEmpty
  • Target Assignment (epoch=0)
    • No membersEmpty
  • Member Assignment
    • No membersEmpty

Member A joins the group. The coordinator bumps the group epoch to (1)..1, adds A to the group, and creates an empty member assignment.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=0)
    • No membersEmpty
  • Member Assignment
    • A - epoch=0, partitions=[]

The coordinator computes and installs the new target assignment. .All the partitions are assigned to A.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=0, partitions=[]

When A sends its next heartbeat, it has no assigned partitions at this stage so the coordinator can directly reply with the new epoch (1) and the new assigned partitions. The state of the group does not change.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=0, partitions=[]

When A sends its next heartbeat with its new epoch (1) and its new assigned partitions, the group coordinator moves the member assignment to (1)heartbeats, the group coordinator transitions him to its target epoch/assignment because it does not have any partitions to revoke. The group coordinator updates the member assignment and replies with the new epoch 1 and all the partitions.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2]

Member B joins the group. The coordinator adds the member to the group and bumps the group epoch to ( 2).

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=0, partitions=[]

...

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=01, partitions=[foo-2]

At this point B can transitions to epoch 1 but cannot get foo-2 until A revokes it.

When A heartbeats, the group coordinator instructs him is blocked at epoch (0) until A moves to epoch (2) because A must revoke foo-2 first before B can start consuming it. The group coordinator will instruct A to revoke foo-2 via its next heartbeat.

When A heartbeats again and acknowledges that the revocation is completed, the group coordinator can move A transitions him to epoch ( 2).

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=01, partitions=[foo-2]

Now When B can get its new assignment via its next heartbeat. The state does not change until B acknowledges the new epoch and assignmentheartbeats, he can now gets foo-2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2]

Member C joins the group. The coordinator adds the member to the group and bumps the group epoch to 3.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2]
    • C - epoch=0, partitions=[]

...

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2]
    • C - epoch=0, partitions=[]

Like for the previous member addition, foo-1 must be revoked first before A can advance to epoch (3) and C is blocked until foo-1 is revoked.

B sends its next heartbeatWhen B heartbeats, the group coordinator replies with transitions him to epoch (3) and partitions (foo-2). The state of the group does not change until B sends another heartbeat to acknowledge3 because B has no partitions to revoke. It persists the change and reply.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=3, partitions=[foo-2]
    • C - epoch=03, partitions=[foo-1]

When C sends its next heartbeat, the group coordinator continues to reply with epoch (0) and no partitions.

A sends its next heartbeat, the group coordinator replies with epoch (2) and partitions (foo-0) to revoke partition foo-1. The state of the group does not change until A sends another heartbeat to acknowledge.

heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.

When A heartbeats, the group coordinator instructs him to revoke foo-1.

When A heartbeats again and acknowledges the revocation, the group coordinator transitions him to epoch 2.

When C heartbeats, the group coordinator transitions him to epoch 3, persists the change, and reply.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0]
    • B - epoch=3, partitions=[foo-2]
    • C - epoch=3, partitions=[foo-1]

All the members have eventually advanced to the group epoch (3).

Incremental Revocation & Assignment

Let's imagine a group with two members and six partitions.

  • Group (epoch=21)
    • A
    • B
  • Target Assignment (epoch=21)
    • A - partitions=[foo-0, foo-1, foo-2]
    • B - partitions=[foo-3, foo-4, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5]

C joins the group. The group coordinator adds him, bumps the group epoch, create the member assignment, and computes the target assignment.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5]
    • C - epoch=0, partitions=[]

C heartbeats, the group coordinator transitions him to epoch 22 but does not yet give him any partitions because they are not revoked yet.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5]
    • C - epoch=22, partitions=[foo-2, foo-5]

A heartbeats, the group coordinator instructs him to revoke foo-2.

B heartbeats, the group coordinator instructs him to revoke foo-5.

C heartbeats, no changes for him.

A heartbeats and acknowledges the revocation, the group coordinator transitions him to epoch 22, persists and reply.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5]
    • C - epoch=22, partitions=[foo-2, foo-5]

C heartbeats, the group coordinator gives him foo-2 which is now free but hold foo-5.

B heartbeats and acknowledges the revocation, the group coordinator transitions him to epoch 22, persists and reply.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1]
    • B - epoch=22, partitions=[foo-3, foo-4]
    • C - epoch=22, partitions=[foo-2, foo-5]

C heartbeats, the group coordinator gives him foo-2 and foo-5.

Member Failure

Let's start with a group with three members and six partitions.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1]
    • B - epoch=22, partitions=[foo-3, foo-4]
    • C - epoch=22, partitions=[foo-2, foo-5]

A fails to heartbeat. The group coordinator removes him after the session timeout expires and bump the group epoch.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • B - epoch=22, partitions=[foo-3, foo-4]
    • C - epoch=22, partitions=[foo-2, foo-5]

The group coordinator computes the new target assignment.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=23)
    • B - partitions=[foo-3, foo-4, foo-0]
    • C - partitions=[foo-2, foo-5, foo-1]
  • Member Assignment
    • B - epoch=22, partitions=[foo-3, foo-4]
    • C - epoch=22, partitions=[foo-2, foo-5]

B and C heartbeat and transition to epoch 23.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=23)
    • B - partitions=[foo-3, foo-4, foo-0]
    • C - partitions=[foo-2, foo-5, foo-1]
  • Member Assignment
    • B - epoch=23, partitions=[foo-3, foo-4, foo-0]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-1]

Partition Added

Let's start with a group with two members and one partition.

  • Group (epoch=22)
    • A
    • B
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0]
    • B - partitions=[]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0]
    • B - epoch=22, partitions=[]

A new partition foo-1 is created. The group coordinator detects it. It updates the group and bump the group epoch.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0]
    • B - partitions=[]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0]
    • B - epoch=22, partitions=[]

The group coordinator computes a new target assignment.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=23
  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=322, partitions=[foo-0]
    • B - epoch=322, partitions=[foo-2]
    • C - epoch=0, partitions=[]
    • ]

B and C heartbeat and transition to epoch 23At this point, foo-1 is free so C can advance to epoch (3). C sends it next heartbeat, the group coordinator replies with epoch (3) and partitions (foo-1). The state of the group does not change until C sends another heartbeat to acknowledge.

  • Group (epoch=323)
    • A
    • B
    • C
  • Target Assignment (epoch=323)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=323, partitions=[foo-0]
    • B - epoch=3, partitions=[foo-2]C - epoch=323, partitions=[foo-1]

All the members have eventually advanced to the group epoch (3).

Compatibility, Deprecation, and Migration Plan

...

  • Deploy the new software
  • Roll the members
  • Enable the new protocol
  • Roll the members

Regular expression.



  • Kafka 4.0 opt-in, consumer support.
  • Kafka 4.x opt-in, streams support.
  • Kafka 5.0, new protocol by default.

...