...
Code Block | ||||
---|---|---|---|---|
| ||||
{ "type": "data", "name": "ConsumerGroupMemberMetadataValue", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { "name": "GroupEpoch", "versions": "0+", "type": "int32" }, { "name": "InstanceId", "versions": "0+", "type": "string" }, { "name": "ClientId", "versions": "0+", "type": "string" }, { "name": "ClientHost", "versions": "0+", "type": "string" }, { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" }, { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" }, { "name": "Assignors", "versions": "0+", "type": "[]Assignor", "fields": [ { "name": "Name", "versions": "0+", "type": "string" }, { "name": "MinimumVersion", "versions": "0+", "type": "int16" }, { "name": "MaximumVersion", "versions": "0+", "type": "int16" }, { "name": "VersionReason", "versions": "0+", "type": "int16int8" }, { "name": "ReasonVersion", "versions": "0+", "type": "int8int16" }, { "name": "Metadata", "versions": "0+", "type": "bytes" } ]} ], } |
...
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer |
group.remote.assignor | string | uniform | The server side assignor to use. It cannot be used in conjunction with group.local.assignor. |
group.local.assignors | list | empty | The list of client side (local) assignors. It cannot be used in conjunction with group.remote.assignor. |
Streams
...
Member Metadata and Assignment Metadata
TODO
The changes here are mainly informative at this stage. They show how we could structure the Streams' metadata. We may decide to leverage this change to do more changes.
Member Metadata Schema
This is the schema of the metadata advertised by each member.
Name | Type | Doc |
---|---|---|
ProcessId |
name
type
note
uuid / static |
Inherited. In the future we may also remove this field when one instance only use one consumer
Identity of the instance that may have multiple consumers. | |
UserEndPoint | bytes / static |
Inherited
Used for cross-client communication. | |
ClientTags | map / static |
Inherited
errorCode
int8 / dynamic
Inherited and enhanced. Communicate rebalance reasons, possible values:
- None
- Shutdown
- Warm-up ready
- Warm-up failed
- Requested by coordinator
- Topology changed
- [more?]
topologyHash
uuid / dynamic
Only updatable when errorCode is not “none”.
taskLag
array / dynamic
Only updatable when errorCode is not “none”.
name
type
note
activeTasks
list
Inherited
standbyTasks
map
Modified, only contain normal standby tasks
warmupTasks
map
New, warm-up standby tasks
partitionsByHost
map
Merged, global assignment information used for IQ
errorCode
int8
Inherited, possible values:
- None
- Shutdown
- AssignmentError
- InconsistentTopology
- [more?]
Used for rack-aware assignment algorithm. | ||
TopologyHash | uuid / dynamic | Only updatable when reason is not zero. |
TaskLag | array / dynamic | Only updatable when reason is not zero. |
Member Metadata Reasons
- None (0)
- Shutdown (1)
- WarmUpReady (2)
- WarmUpFailed (3)
- TopologyChanged (4)
Assignment Metadata Schema
Name | Type | Doc |
---|---|---|
ActiveTasks | list | Local assignment for this consumer. |
StandbyTasks | map | Local standby tasks for this consumer. |
WarmupTasks | map | Local warming up tasks for this consumer. |
PartitionsByHost | map | Global assignment information used for IQ. |
Assignment Metadata Errors
- None (0)
- Shutdown (1)
- AssignmentError (2)
- InconsistentTopology (3)
Streams Configurations
Name | Type | Default | Doc |
---|---|---|---|
group.protocol | enum | generic | A flag which indicates if the new protocol should be used or not. It could be: generic or consumer |
...
Code Block | ||||
---|---|---|---|---|
| ||||
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–list -–type <comma separated list of types> kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–describe -–type <comma separated list of types> |
Case Studies
Basic
Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.
Let’s start with an empty group:
- Group (epoch=0)
- No membersEmpty
- Target Assignment (epoch=0)
- No membersEmpty
- Member Assignment
- No membersEmpty
Member A joins the group. The coordinator bumps the group epoch to (1)..1, adds A to the group, and creates an empty member assignment.
- Group (epoch=1)
- A
- Target Assignment (epoch=0)
- No membersEmpty
- Member Assignment
- A - epoch=0, partitions=[]
The coordinator computes and installs the new target assignment. .All the partitions are assigned to A.
- Group (epoch=1)
- A
- Target Assignment (epoch=1)
- A - partitions=[foo-0, foo-1, foo-2]
- Member Assignment
- A - epoch=0, partitions=[]
When A sends its next heartbeat, it has no assigned partitions at this stage so the coordinator can directly reply with the new epoch (1) and the new assigned partitions. The state of the group does not change.
- Group (epoch=1)
- A
- Target Assignment (epoch=1)
- A - partitions=[foo-0, foo-1, foo-2]
- Member Assignment
- A - epoch=0, partitions=[]
When A sends its next heartbeat with its new epoch (1) and its new assigned partitions, the group coordinator moves the member assignment to (1)heartbeats, the group coordinator transitions him to its target epoch/assignment because it does not have any partitions to revoke. The group coordinator updates the member assignment and replies with the new epoch 1 and all the partitions.
- Group (epoch=1)
- A
- Target Assignment (epoch=1)
- A - partitions=[foo-0, foo-1, foo-2]
- Member Assignment
- A - epoch=1, partitions=[foo-0, foo-1, foo-2]
Member B joins the group. The coordinator adds the member to the group and bumps the group epoch to ( 2).
- Group (epoch=2)
- A
- B
- Target Assignment (epoch=1)
- A - partitions=[foo-0, foo-1, foo-2]
- Member Assignment
- A - epoch=1, partitions=[foo-0, foo-1, foo-2]
- B - epoch=0, partitions=[]
...
- Group (epoch=2)
- A
- B
- Target Assignment (epoch=2)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-2]
- Member Assignment
- A - epoch=1, partitions=[foo-0, foo-1, foo-2]
- B - epoch=01, partitions=[foo-2]
At this point B can transitions to epoch 1 but cannot get foo-2 until A revokes it.
When A heartbeats, the group coordinator instructs him is blocked at epoch (0) until A moves to epoch (2) because A must revoke foo-2 first before B can start consuming it. The group coordinator will instruct A to revoke foo-2 via its next heartbeat.
When A heartbeats again and acknowledges that the revocation is completed, the group coordinator can move A transitions him to epoch ( 2).
- Group (epoch=2)
- A
- B
- Target Assignment (epoch=2)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-2]
- Member Assignment
- A - epoch=2, partitions=[foo-0, foo-1]
- B - epoch=01, partitions=[foo-2]
Now When B can get its new assignment via its next heartbeat. The state does not change until B acknowledges the new epoch and assignmentheartbeats, he can now gets foo-2.
- Group (epoch=2)
- A
- B
- Target Assignment (epoch=2)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-2]
- Member Assignment
- A - epoch=2, partitions=[foo-0, foo-1]
- B - epoch=2, partitions=[foo-2]
Member C joins the group. The coordinator adds the member to the group and bumps the group epoch to 3.
- Group (epoch=3)
- A
- B
- C
- Target Assignment (epoch=2)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-2]
- Member Assignment
- A - epoch=2, partitions=[foo-0, foo-1]
- B - epoch=2, partitions=[foo-2]
- C - epoch=0, partitions=[]
...
- Group (epoch=3)
- A
- B
- C
- Target Assignment (epoch=3)
- A - partitions=[foo-0]
- B - partitions=[foo-2]
- C - partitions=[foo-1]
- Member Assignment
- A - epoch=2, partitions=[foo-0, foo-1]
- B - epoch=2, partitions=[foo-2]
- C - epoch=0, partitions=[]
Like for the previous member addition, foo-1 must be revoked first before A can advance to epoch (3) and C is blocked until foo-1 is revoked.
B sends its next heartbeatWhen B heartbeats, the group coordinator replies with transitions him to epoch (3) and partitions (foo-2). The state of the group does not change until B sends another heartbeat to acknowledge3 because B has no partitions to revoke. It persists the change and reply.
- Group (epoch=3)
- A
- B
- C
- Target Assignment (epoch=3)
- A - partitions=[foo-0]
- B - partitions=[foo-2]
- C - partitions=[foo-1]
- Member Assignment
- A - epoch=2, partitions=[foo-0, foo-1]
- B - epoch=3, partitions=[foo-2]
- C - epoch=03, partitions=[foo-1]
When C sends its next heartbeat, the group coordinator continues to reply with epoch (0) and no partitions.
A sends its next heartbeat, the group coordinator replies with epoch (2) and partitions (foo-0) to revoke partition foo-1. The state of the group does not change until A sends another heartbeat to acknowledge.
heartbeats, it transitions to epoch 3 but cannot get foo-1 yet.
When A heartbeats, the group coordinator instructs him to revoke foo-1.
When A heartbeats again and acknowledges the revocation, the group coordinator transitions him to epoch 2.
When C heartbeats, the group coordinator transitions him to epoch 3, persists the change, and reply.
- Group (epoch=3)
- A
- B
- C
- Target Assignment (epoch=3)
- A - partitions=[foo-0]
- B - partitions=[foo-2]
- C - partitions=[foo-1]
- Member Assignment
- A - epoch=2, partitions=[foo-0]
- B - epoch=3, partitions=[foo-2]
- C - epoch=3, partitions=[foo-1]
All the members have eventually advanced to the group epoch (3).
Incremental Revocation & Assignment
Let's imagine a group with two members and six partitions.
- Group (epoch=21)
- A
- B
- Target Assignment (epoch=21)
- A - partitions=[foo-0, foo-1, foo-2]
- B - partitions=[foo-3, foo-4, foo-5]
- Member Assignment
- A - epoch=21, partitions=[foo-0, foo-1, foo-2]
- B - epoch=21, partitions=[foo-3, foo-4, foo-5]
C joins the group. The group coordinator adds him, bumps the group epoch, create the member assignment, and computes the target assignment.
- Group (epoch=22)
- A
- B
- C
- Target Assignment (epoch=22)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-3, foo-4]
- C - partitions=[foo-2, foo-5]
- Member Assignment
- A - epoch=21, partitions=[foo-0, foo-1, foo-2]
- B - epoch=21, partitions=[foo-3, foo-4, foo-5]
- C - epoch=0, partitions=[]
C heartbeats, the group coordinator transitions him to epoch 22 but does not yet give him any partitions because they are not revoked yet.
- Group (epoch=22)
- A
- B
- C
- Target Assignment (epoch=22)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-3, foo-4]
- C - partitions=[foo-2, foo-5]
- Member Assignment
- A - epoch=21, partitions=[foo-0, foo-1, foo-2]
- B - epoch=21, partitions=[foo-3, foo-4, foo-5]
- C - epoch=22, partitions=[foo-2, foo-5]
A heartbeats, the group coordinator instructs him to revoke foo-2.
B heartbeats, the group coordinator instructs him to revoke foo-5.
C heartbeats, no changes for him.
A heartbeats and acknowledges the revocation, the group coordinator transitions him to epoch 22, persists and reply.
- Group (epoch=22)
- A
- B
- C
- Target Assignment (epoch=22)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-3, foo-4]
- C - partitions=[foo-2, foo-5]
- Member Assignment
- A - epoch=22, partitions=[foo-0, foo-1]
- B - epoch=21, partitions=[foo-3, foo-4, foo-5]
- C - epoch=22, partitions=[foo-2, foo-5]
C heartbeats, the group coordinator gives him foo-2 which is now free but hold foo-5.
B heartbeats and acknowledges the revocation, the group coordinator transitions him to epoch 22, persists and reply.
- Group (epoch=22)
- A
- B
- C
- Target Assignment (epoch=22)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-3, foo-4]
- C - partitions=[foo-2, foo-5]
- Member Assignment
- A - epoch=22, partitions=[foo-0, foo-1]
- B - epoch=22, partitions=[foo-3, foo-4]
- C - epoch=22, partitions=[foo-2, foo-5]
C heartbeats, the group coordinator gives him foo-2 and foo-5.
Member Failure
Let's start with a group with three members and six partitions.
- Group (epoch=22)
- A
- B
- C
- Target Assignment (epoch=22)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-3, foo-4]
- C - partitions=[foo-2, foo-5]
- Member Assignment
- A - epoch=22, partitions=[foo-0, foo-1]
- B - epoch=22, partitions=[foo-3, foo-4]
- C - epoch=22, partitions=[foo-2, foo-5]
A fails to heartbeat. The group coordinator removes him after the session timeout expires and bump the group epoch.
- Group (epoch=23)
- B
- C
- Target Assignment (epoch=22)
- A - partitions=[foo-0, foo-1]
- B - partitions=[foo-3, foo-4]
- C - partitions=[foo-2, foo-5]
- Member Assignment
- B - epoch=22, partitions=[foo-3, foo-4]
- C - epoch=22, partitions=[foo-2, foo-5]
The group coordinator computes the new target assignment.
- Group (epoch=23)
- B
- C
- Target Assignment (epoch=23)
- B - partitions=[foo-3, foo-4, foo-0]
- C - partitions=[foo-2, foo-5, foo-1]
- Member Assignment
- B - epoch=22, partitions=[foo-3, foo-4]
- C - epoch=22, partitions=[foo-2, foo-5]
B and C heartbeat and transition to epoch 23.
- Group (epoch=23)
- B
- C
- Target Assignment (epoch=23)
- B - partitions=[foo-3, foo-4, foo-0]
- C - partitions=[foo-2, foo-5, foo-1]
- Member Assignment
- B - epoch=23, partitions=[foo-3, foo-4, foo-0]
- C - epoch=23, partitions=[foo-2, foo-5, foo-1]
Partition Added
Let's start with a group with two members and one partition.
- Group (epoch=22)
- A
- B
- Target Assignment (epoch=22)
- A - partitions=[foo-0]
- B - partitions=[]
- Member Assignment
- A - epoch=22, partitions=[foo-0]
- B - epoch=22, partitions=[]
A new partition foo-1 is created. The group coordinator detects it. It updates the group and bump the group epoch.
- Group (epoch=23)
- A
- B
- Target Assignment (epoch=22)
- A - partitions=[foo-0]
- B - partitions=[]
- Member Assignment
- A - epoch=22, partitions=[foo-0]
- B - epoch=22, partitions=[]
The group coordinator computes a new target assignment.
- Group (epoch=23)
- A
- B
- Target Assignment (epoch=23
- Group (epoch=3)
- A
- B
- C
- Target Assignment (epoch=3)
- A - partitions=[foo-0]
- B - partitions=[foo-2]C - partitions=[foo-1]
- Member Assignment
- A - epoch=322, partitions=[foo-0]
- B - epoch=322, partitions=[foo-2]
- C - epoch=0, partitions=[]
- ]
B and C heartbeat and transition to epoch 23At this point, foo-1 is free so C can advance to epoch (3). C sends it next heartbeat, the group coordinator replies with epoch (3) and partitions (foo-1). The state of the group does not change until C sends another heartbeat to acknowledge.
- Group (epoch=323)
- A
- B
- C
- Target Assignment (epoch=323)
- A - partitions=[foo-0]
- B - partitions=[foo-2]C - partitions=[foo-1]
- Member Assignment
- A - epoch=323, partitions=[foo-0]
- B - epoch=3, partitions=[foo-2]C - epoch=323, partitions=[foo-1]
All the members have eventually advanced to the group epoch (3).
Compatibility, Deprecation, and Migration Plan
...
- Deploy the new software
- Roll the members
- Enable the new protocol
- Roll the members
Regular expression.
- Kafka 4.0 opt-in, consumer support.
- Kafka 4.x opt-in, streams support.
- Kafka 5.0, new protocol by default.
...