Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
maxLevel4

Status

Current state: Under Discussion Accepted

Discussion thread: Thread 1 and Thread 2here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

When a client side assignor is used, the group coordinator requests the assignment from one group member by notifying him it via the heartbeat protocol. The chosen member uses the ConsumerGroupPrepareAssignment API and the ConsumerGroupInstallAssignment API to respectively get the current state of the group and to install the computed assignment. Thanks to this, the input of the client side assignor is entirely driven by the group coordinator. The consumer is no longer responsible for maintaining any state besides its assigned partitions.

The new protocol’s RPCs are specified in the details in the public interfaces section of this document while the details of the rebalance logic is described in the next chapter.

Group Coordinator

Implementing the new rebalance protocol in the current group coordinator is not appropriate in our opinion because its requires many changes anyway in the current protocol to make it interoperable. Therefore, this KIP proposes to rewrite the group coordinator from scratch in Java. The new This KIP proposes to evolve the group coordinator to rely on an event loop. The rational of using an event loop is that 1) it simplifies the concurrency and 2) enables simulation testing. The group coordinator will have a replicated state machine per __consumer_offsets partitions, where each replicated state machine is modelled as an event loop. Those replicated state machines will be executed in group.coordinator.threads threads.

...

Consumer Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member. It is generated by the client once the coordinator upon the first heartbeat request and must be used during its the lifetime of the member. The ID is similar to an incarnation ID.
Instance IDstringThe instance ID configured by the consumer.
Rack IDstringThe rack ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client host configured by the consumer.
Subscribed Topic IdsNames[]uuidstringThe current set of subscribed topic ids names configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Reasonint8The reason why the metadata was updated.
Minimum Versionint16The minimum version of the metadata schema supported by this assignor.
Maximum Versionint16The maximum version of the metadata schema supported by this assignor.
Versionint16The version used to encode the metadata.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

The rebalance timeout is provided by the member when it joins the group. It is basically the max poll interval configured on the client side. The timer starts ticking when the heartbeat response request is sent out processed by the group coordinator.

...

The group coordinator has to determine which assignment strategy must be used for the group. The group's members may not have exactly the same assignors at any given point in time - e.g. they may migrate from an assignor to another one for instance. The group coordinator will chose the assignor as follow:

  • The server A client side assignor is used if any member specified one. If multiple server side assignors are specified in the group, the group coordinator uses the most common one.The client side assignor is used otherwise. The group coordinator will use the assignor which is possible. This means that a client side assignor must be supported by all the members in the group and, if . If multiple are, it will respect the precedence defined by the members when they advertise their supposed supported client side assignors.

Server Side Mode

...

  • A server side assignor is used otherwise. If multiple server side assignors are specified in the group, the group coordinator uses the most common one. If a member does not provide an assignor, the group coordinator will default to the first one in group.consumer.assignors.

Server Side Mode

The server side assignor is pluggable and the pluggable and the client can choose the one that it wants to use by providing its name in the heartbeat request. If the selected assignor does not exist, the group coordinator will reject the heartbeat with an UNSUPPORTED_ASSIGNOR error. The list of supported assignors will be configured in the broker configuration.

We will support two assignors out of the box for Apache Kafka:

  • range - org.apache.kafka.server.group.consumer.RangeAssignor - An assignor which co-partitions topics.
  • uniform - org.apache.kafka.server.group.consumer.UniformAssignor - An assignor which uniformly assign partitions amongst the members. This is somewhat similar to the existing "sticky" assignor.

...

  • The group coordinator selects a member to run the assignment logic. The selection is explained later in this chapter.
  • The group coordinator notifies the member to compute the new assignment by returning the COMPUTE_ASSIGNMENT error setting the ShouldComputeAssignment field in its next heartbeat response.
  • When the member receives this error, it is expected to call the ConsumerGroupPrepareAssignment API to get the current group metadata and the current target assignment.
  • The member computes the new assignment with the relevant assignor.
  • The member calls the ConsumerGroupInstallAssignment API to install the new assignment. The group coordinator validates it and persists it.

...

  • All partitions are assigned.
  • A partition is assigned only once.
  • All members existsmember ids are valid. They must correspond to members in the group.

Note that this validation is made with regarding to the metadata used to compute the assignment. The group may have already advanced to a newer group epoch - e.g. a member could have left during the assignment computation.

...

The member uses the ConsumerGroupHeartbeat API to establish a session between him and the with the group coordinator. The member is expected to heartbeat every group.consumer.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.consumer.session.timeout.ms, the group coordinator will kick him the member out from the group. group.consumer.heartbeat.interval.ms is defined on the server side and the member is told about it in the heartbeat response. The group.consumer.session.timeout.ms is also defined on the server side.

...

The member joins the group by sending an heartbeat with no Member ID and a member epoch equals to 0. He can rejoins the group with a with a member epoch equals to 0. He can leaves the group by using a member epoch equals to -1. The member must commit its offsets before leaving.

Fencing

The group coordinator ensures that requests comes from a known Member ID. Any request is rejected with the UNKNOWN_MEMBER_ID error otherwise. It also ensures that the Member Epoch matches the expected member epoch. If not, the request is rejected with the FENCED_MEMBER_EPOCH error. In this case, the member is expected to immediately gives up all its partitions and rejoin the group with the same member ID and a member epoch equals to zero. Details for every API are given in the Public Interfaces section.

...

Static membership, introduced in KIP-345, is still supported by this new rebalance protocol. When a member wants to leave temporary – e.g. while being bounced – it should send an heartbeat with a member epoch equals to -2. This signals to the group coordinator that the member left but will rejoin within the session timeout. When the member rejoins with the same Instance instance ID, the group coordinator replaces the old member with by the new member . The new member can continue from where it left offand gives back its current assignment.

If the leaving member does not rejoin within the session timeout, the group coordinator kicks it out from the group. If a new member joins with an existing instance ID before the previous member left, the new member is rejected with a UNRELEASED_INSTANCE_ID error as long as the previous member is still present.

Consumer Group States

EMPTY

...

The group configurations are stored in the controller like all the other dynamic configurations in the cluster. This allows configurations to be installed independently from wether whether the group exists or not. Configurations are also preserved if the group is deleted. In ZK mode, the dynamic group configurations will be store in the /config/groups znode. In KRaft mode, they are stored in the ConfigRecord.

...

The new group coordinator will rely on the metadata.version or IBP to enable the new protocol. The exact version will be determined when the feature is ready.

Persistence

Fail Over

When the Group Coordinator fails over, the newly elected coordinator will load the state from the We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

Consumer

The semantics of the consumer will remain unchanged after this proposal is implemented. The goal is to swap the implementation of the group membership/assignment protocol by the new one.

Feature Flag

partition(s). When it is done with this, it has a few other duties for the newly loaded groups:

  • It has to setup the session timeouts for all the members (like today).
  • It has to check whether the topic-partition metadata has changed and potentially trigger a rebalance for the group if it has.
  • It has to check whether new topics match the regex subscriptions and trigger a rebalance for the group if new topic do.
  • It has to check whether a new assignment is required for the group (group epoch != assignment epoch). If it is the case, the group coordinator can directly compute it using the server side assignor or can trigger a client side assignment computation.

Persistence

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

Consumer

The semantics of the consumer will remain unchanged after this proposal is implemented. The goal is to swap the implementation of the group membership/assignment protocol by the new one.

Feature Flag

A new configuration setting will be used to determine whether the new protocol A new configuration setting will be used to determine whether the new protocol should be used or not. The feature flag allows the user to control when he starts using or migrating to the new protocol for its application. This is also required by our migration path as we will require to have the software on a specific version which is compatible with the new protocol.

...

The rebalance process in the consumer is basically the opposite of the process that was described earlier in this document. The consumer will know at any point in time its current epoch and the list of partitions that it owns. There are a few cases to consider:

Fenced

If the member is fenced by the group coordinator, it will immediately abandon all its partitions and call ConsumerRebalanceListener#onPartitionsLost. It will rejoin the group as a new member afterwards.

...

Revocation

When the member has got a new target assignment, the

...

group coordinator will notify it that it has to revoke partitions if any partitions must be revoked. The member can determine the revoked partitions by computing the difference between its current local assignment and the assignment received from the group coordinator. The consumer stops fetching from those revoked partitions and, If auto commit is enabled, commits their offsets. Finally, it calls ConsumerRebalanceListener#onPartitionsRevoked and inform the group coordinator that the revocation process in its next heartbeat.

Assignment

When the member transitions to its next epoch, it will receives the its epoch, its assigned partitions and its pending partitions from the group coordinator. When the member transitions to its new epoch, PartitionAssignor#onAssignment is called if a customer assignor assignor is setup. PartitionAssignor#onAssignment receives the new metadata and the new target assignment for the member. The metadata is final for this epoch. The target assignment is also final for this epoch and contains partitions that are not revoked yet by other members. Therefore, the assignor does not know which of them are being fetched or not. It must rely on the ConsumerRebalanceListener for this. Finally, ConsumerRebalanceListener#onPartitionsAssigned is called with the assigned partitions and the consumer starts fetching those partitions.

Whenever new partitions are assigned to the member within the current epoch, the consumer calls ConsumerRebalanceListener#onPartitionsAssigned with those. This means that ConsumerRebalanceListener#onPartitionsAssigned can be called multiple times within an epoch. At most, it will be called N times where N is the number of assigned partitions in the current epoch.

Client-Side Assignor

By default, the consumer will entirely rely on the group coordinator but it will allow specifying a customer assignor on the client-side as

...

  • If there are any revoked partitions, it will revoke them, commits their offsets and call ConsumerRebalanceListener#onPartitionsRevoked.
  • If there are any newly assigned partitions, it will start processing them and call ConsumerRebalanceListener#onPartitionsAssigned.

Note that the process is dependent on Consumer#poll being called like with the current protocol. There is a parallel effort to this design to redesign the threading model of the consumers. This will very likely change how/when those callbacks are called.

Client-Side Assignor

By default, the consumer will entirely rely on the group coordinator but it will allow specifying a customer assignor on the client-side as already explained in this document. For this purpose, we propose to introduce a new and optional assignor interface in the Consumer called PartitionAssignor. The interface is specified in the public interfaces section of this document. The current assignor interface is strongly tied to the current group membership/assignment protocol so reusing it is not appropriate for two reasons:

...

Consumer#enforceRebalance will be deprecated and will throw an IllegalStateException be a no-op if used when the new protocol is enabledenable. A warning will be logged in this case. Enforcing a rebalance with the new protocol does not make any sense. Instead, power users will have the ability to trigger a reassignment by either providing a non-zero reason or by updating the assignor metadata.

...

The idea is to manage each members individually while relying on the new engine to do the synchronization between them. Each member will use rebalance loops to update the group coordinator and collect their assignment. The group coordinator will ensure that a rebalance is triggered when one needs to update his its assignments. This process is detailed in the next sub-chapters:

...

  • The basic validations are applied. The request is rejected with the appropriate error if the validation fails.
    • The group must exist.
    • The member must exist or be created.
    • The embedded protocol must be on version >= 3.
    • The generation id must match the current member epoch. The generation id is provided in the embedded protocol.
  • The group state is updated if needed.
    • The group epoch is incremented if a new assignment is required. See "Group Epoch - Trigger a rebalance" chapter for details about the rebalance triggers.
    • The Topics provided in the embedded consumer The Topics provided in the embedded consumer protocol are used to update the SubscribedTopicNames.
    • The Protocols are converted to Assignors where the UserData become the metadata and the version is set to -1.
  • The current member assignment is updated if needed:
    • If the member has revoked all its partitions or the required partitions, the member can transition to its next epoch. The target assignment becomes the current assignment. The group coordinator replies with the new member epoch as the generation id.
    • If the member has to revoke partitions, the group coordinator replies with the current member epoch as the generation id.
  • The member transitions to CompletingRebalance state.

...

  • FENCED_MEMBER_EPOCH - The member epoch is fenced by the coordinator. The member must abandon all its partitions and rejoins.
  • STALE_MEMBER_EPOCH - The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.
  • COMPUTEUNRELEASED_INSTANCE_ASSIGNMENT ID - The member has been selected by the coordinator to compute the new target assignment of the groupinstance ID is still used by another member. The member must leave first.
  • UNSUPPORTED_ASSIGNOR - The assignor used by the member or its version range are not supported by the group.

...

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member id generated by the servercoordinator. The member id must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
 "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null itif not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
      { "name": "RebalanceTimeoutMsRackId", "type": "int32string", "versions": "0+",  "nullableVersions": "0+", "default": -1"null",
        "about": "-1null if not provided or if it didn't chancechange since the last heartbeat; the maximumrack timeID inof milliseconds that the coordinator will wait on the member to revoke its partitions consumer otherwise." },
      { "name": "SubscribedTopicNamesRebalanceTimeoutMs", "type": "[]stringint32", "versions": "0+", "nullableVersionsdefault": "0+", "default": "null",
-1,
        "about": "null-1 if it didn't changechance since the last heartbeat; the maximum subscribedtime topicin namesmilliseconds otherwise." },
    { "name"that the coordinator will wait on the member to revoke its partitions otherwise." },
      { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
      { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
      { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
      { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.",
      "fields": [
          { "name": "Name", "type": "string", "versions": "0+",
            "about": "The name of the assignor." },
          { "name": "MinimumVersion", "type": "int16", "versions": "0+",
            "about": "The minimum supported version for the metadata." },
          { "name": "MaximumVersion", "type": "int16", "versions": "0+",
            "about": "The maximum supported version for the metadata." },
          { "name": "Reason", "type": "int8", "versions": "0+",
            "about": "The reason of the metadata update." }, 
          { "name": "VersionMetadataVersion", "type": "int16", "versions": "0+",
            "about": "The version of the metadata." },
             { "name": "MetadataMetadataBytes", "type": "bytes", "versions": "0+",
            "about": "The metadata." }
      ]},
      { "name": "TopicPartitions", "type": "[]TopicPartitionTopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the partitions owned by the member.",
      "fields": [
        {  { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
  ]
}

Required ACL

  • Read Group

Request Validation

...

  • GroupId must be non-empty.
  • MemberId must be non-empty.
  • MemberEpoch must be >= -1.
  • InstanceId, if provided, must be non-empty.
  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.SubscribedTopicNames and SubscribedTopicRegex cannot be used together.
  • SubscribedTopicNames or SubscribedTopicRegex must be, at minimum, in the first heartbeat requestheartbeat request when member epoch is 0.
  • SubscribedTopicRegex must be a valid regular expression.
  • ServerAssignor and ClientAssignors cannot be used together.
  • Assignor.Name must be non-empty.
  • Assignor.MinimumVersion must be >= -1.
  • Assignor.MaximumVersion must be >= 0 and >= Assignor.MinimumVersion.
  • Assignor.Version must be in the >= Assignor.MinimumVersion and <= Assignor.MaximumVersion.

...

  1. Lookups the group or creates it.
  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and his its request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions owned by the members are a subset of the target partitions. If it is the case, it is safe to transition the member to its target epoch again.
  4. Updates the members informations if any. The group epoch is incremented if there is any change. See "Group Epoch - Trigger a rebalance" chapter for details about the rebalance triggers.
  5. Reconcile the member assignments as explained earlier in this document. 

...

The group coordinator will only set the Assignment field when until the member epoch is smaller than the target assignment epochacknowledges that it has converged to the desired assignment. This is done to ensure that the members converge to the target assignment.

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTEUNRELEASED_INSTANCE_ASSIGNMENTID
  // - GROUP_MAX_SIZE_REACHED
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpochMemberId", "type": "int32string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member epoch id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, 
    { "name": "HeartbeatIntervalMsMemberEpoch", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in millisecondsmember epoch." }, 
    { "name": "AssignmentShouldComputeAssignment", "type": "Assignmentbool", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
	  "about": "nullTrue if not provided;the member should compute the assignment otherwisefor the group." },
    {  "fieldsname": [
     	{ "name": "Error""HeartbeatIntervalMs", "type": "int8int32", "versions": "0+",
          "about": "The assigned errorheartbeat interval in milliseconds." }, 
        { "name": "TopicPartitionsAssignment", "type": "[]TopicPartitionAssignment", "versions": "0+",
 "nullableVersions": "0+",      "default": "null",
	  "about": "Thenull assignedif topic-partitions tonot provided; the assignment memberotherwise.",
          "fields": [
        	{ "name": "TopicIdError", "type": "uuidint8", "versions": "0+",
              "about": "The topicassigned IDerror." },
	        { "name": "PartitionsAssignedTopicPartitions", "type": "[]int32TopicPartitions", "versions": "0+",
              "about": "The partitions." }
      	] assigned to the member that can be used immediately." },
        { "name": "VersionPendingTopicPartitions", "type": "int16[]TopicPartitions", "versions": "0+",
          "about": "The versionpartitions assigned ofto the metadata member that cannot be used because they are not released by their former owners yet." },
           { "name": "MetadataMetadataVersion", "type": "bytesint16", "versions": "0+",
          "about": "The assignedversion of the metadata." }
	]
  ]
},
        { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
          "about": "The assigned metadata." }
	]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

Response Handling

If the response contains no error, the member will reconcile its current assignment towards its new assignment. It does the following:

  1. It updates its member epoch.
  2. It computes the difference between the old and the new assignment to determine the revoked partitions and the newly assignment partitions. There should be either revoked partitions or newly assignment partitions. The protocol never does both together.
    1. It revokes the partitions, commit all the offsets, and calls ConsumerRebalanceListener#onPartitionsRevoked.
    2. It assigns the new partitions, calls PartitionAssignor#onAssignment if one is defined and calls ConsumerRebalanceListener#onPartitionsAssigned.
  3. After a revocation, It sends the next heartbeat immediately to acknowledge it. 

Upon receiving the COMPUTE_ASSIGNMENT errorIf the response has ShouldComputeAssignment field set to true, the consumer starts the assignment process.

Upon receiving the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and rejoins with the same member id and the epoch 0.

Upon receiving the UNRELEASED_INSTANCE_ID error, the consumer should fail.

ConsumerGroupPrepareAssignment API

...

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  1. Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Returns the group state of the group.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - STALE_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The member instance ID." },
      { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", 
        "about": "The member instance ID." },
      { "name": "SubscribedTopicIds", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "Assignor", "type": "Assignor", "versions": "0+",
        "about": "The information of the selected assignor",
        "fields": [ 
        { "name": "VersionReason", "type": "int16int8", "versions": "0+",
          "about": "The versionreason of the metadata update." }, 
        { "name": "ReasonMetadataVersion", "type": "int8int16", "versions": "0+",
          "about": "The reasonversion of the metadata update." }, 
        { "name": "MetadataMetadataBytes", "type": "bytes", "versions": "0+",
          "about": "The assignor metadata." }
      ]},
      { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The target topic-partitions of the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "TopicName", "type": "string", "versions": "0+",
          "about": "The topic name." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
          "about": "The number of partitions." }
    ]}    
  ]
}
 

Response Handling

If the response contains no error, the member calls the client side assignor with the group state.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupInstallAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "Error", "type": "int8", "versions": "0+",
      "about": "The assignment error; or zero if the assignment is successful." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
        ]},
      { "name": "VersionMetadataVersion", "type": "int32", "versions": "0+",
        "about": "The metadata version." }
      { "name": "MetadataMetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The metadata bytes." }
    ]}
  ]
}

...

When the group coordinator handle a ConsumerGroupInstallAssignmentRequest request:

  1. Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.
  6. Installs the new target assignment.

...

If the response contains no error, the member is done with the assignment process.

Upon receiving the STALE_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

...

When the group coordinator handle a ConsumerGroupDescribeRequest request:

  • Checks wether whether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  • Looks up the groups and returns the response.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 71,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "GroupIdErrorMessage", "type": "string", "versions": "0+", "entityTypenullableVersions": "groupId0+",
          "about"default": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",null",
          "about": "The grouptop-level stateerror stringmessage, or the empty string null if there was no error." },
           { "name": "GroupEpochGroupId", "type": "int32string", "versions": "0+", "entityType": "groupId",
          "about": "The group epochID string." },
        { "name": "AssignmentEpochGroupState", "type": "int32string", "versions": "0+",
          "about": "The assignment epoch group state string, or the empty string." },
        { "name": "AssignorNameGroupEpoch", "type": "stringint32", "versions": "0+",
          "about": "The selectedgroup assignorepoch." },
        { "name": "MembersAssignmentEpoch", "type": "[]Memberint32", "versions": "0+",
          "about": "The assignment membersepoch." },
          "fields": [
          { "name": "MemberIdAssignorName", "type": "uuidstring", "versions": "0+",
            "about": "The memberselected IDassignor." },
          { "name": "InstanceIdMembers", "type": "string[]Member", "versions": "0+",
            "about": "The member instance ID." }, members.",
          "fields": [
    	      { "name": "MemberEpochMemberId", "type": "int32uuid", "versions": "0+",
            "about": "The current member epochID." },
          { "name": "ClientIdInstanceId", "type": "string", "versions": "0+",
 "nullableVersions": "0+", "default": "null", 
             "about": "The clientmember instance ID." },
          { "name": "ClientHostRackId", "type": "string", "versions": "0+",
 "nullableVersions": "0+", "default": "null", 
             "about": "The member clientrack hostID." },
              { "name": "SubscriptionsMemberEpoch", "type": "[]uuidint32", "versions": "0+",
            "about": "The subscribedcurrent topicmember IDsepoch." },
          { "name": "AssignmentClientId", "type": "Assignmentstring", "versions": "0+",
            "about": "The currentclient assignmentID.",
            "fields": [
  },
           { "name": "PartitionsClientHost", "type": "[]TopicPartitionstring", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
      client host." },
           { "name": "TopicIdSubscribedTopicNames", "type": "uuid[]string", "versions": "0+",
                  "about": "The subscribed topic IDnames." },
                { "name": "PartitionsSubscribedTopicRegex", "type": "[]int32string", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
              "about": "Thethe partitions." }
              ]subscribed topic regex otherwise or null of not provided." },
                 { "name": "VersionAssignment", "type": "int32Assignment", "versions": "0+",
              "about": "The assignorcurrent metadata versionassignment." },
            { "name": "MetadataTargetAssignment", "type": "bytesAssignment", "versions": "0+",
              "about": "The assignortarget metadata bytesassignment." }
          ]},
          { "name": "TargetAssignmentAuthorizedOperations", "type": "Assignmentint32", "versions": "03+",
    "default": "-2147483648",
        "about": "The target assignment."32-bit bitfield to represent authorized operations for this group." }
    ]}
  ],
  "commonStructs": [
    { "name": "Assignment",     "versions": "0+", "fields": [
            { "name": "Partitions", "type": "[]TopicPartitionTopicPartitions", "versions": "0+",
              "about": "The assigned topic-partitions to the member.", "fields": [
        { "name": "TopicId",   "type": "uuid", "fieldsversions": [
"0+",
          "about": "The topic ID." },
        { "name": "TopicIdTopicName", "type": "uuidstring", "versions": "0+",
                  "about": "The topic IDname." }, 
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "VersionError", "type": "int32int8", "versions": "0+",
              "about": "The assignorassigned metadata versionerror." }, 
            { "name": "MetadataMetadataVersion", "type": "bytesint32", "versions": "0+",
              "about": "The assignor metadata bytesversion." }
          ]},
      { "name": "AuthorizedOperationsMetadataBytes", "type": "int32bytes", "versions": "30+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this groupThe assignor metadata bytes." }
    ]}
  ]
}

Response Handling

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty all groups are returned with their state." },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty all groups are returned" }
  ]
}

Required ACL

  • Describe GroupCluster

Request Validation

No particular changes.

...

Code Block
linenumberstrue
{
  "apiKey": 16,
  "type": "response",
  "name": "ListGroupsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out
  // responses before throttling.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the GroupState field (KIP-518).
  //
  // Version 5 adds the GroupType field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
      "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
      "about": "Each group in the response.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+",
        "entityType": "groupId",
        "about": "The group ID." },
      { "name": "ProtocolType", "type": "string", "versions": "0+",
        "about": "The group protocol type." },
      { "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true,
        "about": "The group state name." },
      { "name": "GroupType", "type": "string", "versions": "5+", "ignorable": true,
        "about": "The group statetype name." }
    ]}
  ]
}

Response Handling

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 9,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequest",
  // In version 0, the request read offsets from ZK.
  //
  // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  //
  // Starting in version 2, the request can contain a null topics array to indicate that offsets
  // for all topics should be fetched. It also returns a top level error code
  // for group or coordinator level errors.
  //
  // Version 3, 4, and 5 are the same as version 2.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is adding the require stable flag.
  //
  // Version 8 is adding support for fetching offsets for multiple groups at a time
  //
  // Version 9 adds GenerationIdOrMemberEpoch, MemberId and TopicId fields (KIP-848).
  "validVersions": "0-9",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
      "about": "The group to fetch offsets for." },
    // New fields.
    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
      "about": "The generation of the group." },
    { "name": "MemberId", "type": "string", "versions": "9+", "ignorable": true,
        "about": "The member ID assigned by the group coordinator." },
    // End of new fields.
    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
      "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
        "about": "The topic name."},
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
        "about": "The partition indexes we would like to fetch offsets for." }
    ]},
    { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
      "about": "Each group we would like to fetch offsets for", "fields": [
      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID."},
      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
        "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
        // Updated field.
        { "name": "Name", "type": "string", "versions": "8+", "nullableVersions": "9+", "default": "null", "entityType": "topicName",
          "about": "The topic name."},
        // New field.
        { "name": "TopicId", "type": "uuid", "versions": "9+", "about": "The unique topic ID" },
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
          "about": "The partition indexes we would like to fetch offsets for." }
      ]}
    ]},
    { "name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
      "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
  ]
}

...

The MemberId and the GenerationIdOrMemberEpoch are verified. STALE_MEMBER_EPOCH, UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION is returned accordingly.

The admin client is not expected to provide the MemberId nor the GenerationIdOrMemberEpoch.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": 9,
  "type": "response",
  "name": "OffsetFetchResponse",
  // Version 1 is the same as version 0.
  //
  // Version 2 adds a top-level error code.
  //
  // Version 3 adds the throttle time.
  //
  // Starting in version 4, on quota violation, brokers send out responses before throttling.
  //
  // Version 5 adds the leader epoch to the committed offset.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 adds pending offset commit as new error response on partition level.
  //
  // Version 8 is adding support for fetching offsets for multiple groups
  //
  // Version 9 adds TopicId field and can return STALE_MEMBER_EPOCH, UNKNOWN_MEMBER_ID,
  // ILLEGAL_GENERATION, and UNKNOWN_TOPIC_ID errors.
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0-7",
      "about": "The responses per topic.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
        "about": "The topic name." },
      { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0-7",
        "about": "The responses per partition", "fields": [
        { "name": "PartitionIndex", "type": "int32", "versions": "0-7",
          "about": "The partition index." },
        { "name": "CommittedOffset", "type": "int64", "versions": "0-7",
          "about": "The committed message offset." },
        { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "5-7", "default": "-1",
          "ignorable": true, "about": "The leader epoch." },
        { "name": "Metadata", "type": "string", "versions": "0-7", "nullableVersions": "0-7",
          "about": "The partition metadata." },
        { "name": "ErrorCode", "type": "int16", "versions": "0-7",
          "about": "The error code, or 0 if there was no error." }
      ]}
    ]},
    { "name": "ErrorCode", "type": "int16", "versions": "2-7", "default": "0", "ignorable": true,
      "about": "The top-level error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]OffsetFetchResponseGroup", "versions": "8+",
      "about": "The responses per group id.", "fields": [
      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID." },
      { "name": "Topics", "type": "[]OffsetFetchResponseTopics", "versions": "8+",
        "about": "The responses per topic.", "fields": [
        { "name": "Name", "type": "string", "versions": "8+", "nullableVersions": "9+", "default": "null", "entityType": "topicName",
          "about": "The topic name."},
        { "name": "TopicId", "type": "uuid", "versions": "9+", "about": "The unique topic ID" },
        { "name": "Partitions", "type": "[]OffsetFetchResponsePartitions", "versions": "8+",
          "about": "The responses per partition", "fields": [
          { "name": "PartitionIndex", "type": "int32", "versions": "8+",
            "about": "The partition index." },
          { "name": "CommittedOffset", "type": "int64", "versions": "8+",
            "about": "The committed message offset." },
          { "name": "CommittedLeaderEpoch", "type": "int32", "versions": "8+", "default": "-1",
            "ignorable": true, "about": "The leader epoch." },
          { "name": "Metadata", "type": "string", "versions": "8+", "nullableVersions": "8+",
            "about": "The partition metadata." },
          { "name": "ErrorCode", "type": "int16", "versions": "8+",
            "about": "The partition-level error code, or 0 if there was no error." }
        ]}
      ]},
      { "name": "ErrorCode", "type": "int16", "versions": "8+", "default": "0",
        "about": "The group-level error code, or 0 if there was no error." }
    ]}
  ]
}

...

When a member is deleted, a tombstone for him it is written to the partition.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}

Note that the Epoch is always the latest epoch of the group.

ConsumerGroupPartitionMetadataKey

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadataTopics", "versions": "0+",
          "type": "[]TopicPartitionTopicMetadata", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "NumPartitions", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

Note that the Epoch is always the latest epoch of the group.

ConsumerGroupMemberMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataKey",
    "validVersions": "5",
    "flexibleVersions": "none",
    "fields": [
        { "name": "GroupId", "type": "string", "versions": "5" },
        { "name": "MemberId", "type": "string", "versions": "5" }
    ]
}

ConsumerGroupMemberMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "50",
    "flexibleVersions": "none0+",
    "fields": [
        { "name": "GroupIdGroupEpoch", "typeversions": "string0+", "versionstype": "5int32" },
        { "name": "MemberIdInstanceId", "typeversions": "string0+", "versionsnullableVersions": "50+" }
    ]
}

ConsumerGroupMemberMetadataValue

Code Block
languagejs
linenumberstrue
{
    , "type": "datastring" },
            { "name": "ConsumerGroupMemberMetadataValueRackId",
     "validVersionsversions": "0+",
     "flexibleVersionsnullableVersions": "0+",
     "fieldstype": [
        "string" },
        { "name": "GroupEpochClientId", "versions": "0+", "type": "int32string" },
        { "name": "InstanceIdClientHost", "versions": "0+", "type": "string" },
        { "name": "ClientIdSubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "ClientHostSubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNamesAssignors", "versions": "0+", 
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
                    { "name": "SubscribedTopicRegexMinimumVersion", "versions": "0+", "type": "stringint16" },
                    { "name": "AssignorsMaximumVersion", "versions": "0+", "type": "int16" },
          "type            { "name": "Reason", "versions": "[]Assignor0+", "fieldstype": [
             "int8" },
			{ "name": "NameVersion", "versions": "0+", "type": "stringint16" },
                        { "name": "MinimumVersionMetadata", "versions": "0+", "type": "int16bytes" }
          ]}
    ],
            {  
}

Note that the GroupEpoch is always the latest epoch of the group.

Target Assignment

The target assignment is stored in N + 1 records where N is the number of members in the group. The records for the members are written first and followed by the assignment metadata. When a new assignment is computed, the group coordinator will compare it with the current assignment and only write the difference between the two assignments to the __consumer_offsets partition. The assignment must be atomic so the group coordinator will ensure that all the records are written in a single batch. This limit the size of the batch to 1MB (the default value). Given the incremental nature of the protocol, 1MB should be sufficient in most case here. 

ConsumerGroupTargetAssignmentMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "MaximumVersionConsumerGroupTargetAssignmentMetadataKey", 
    "versionsvalidVersions": "0+6", 
    "typeflexibleVersions": "int16none" },
                "fields": [
      	{ "name": "ReasonGroupId", "versionstype": "0+string", "typeversions": "int86" }
    ]
}

ConsumerGroupTargetAssignmentMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    ",
			{ "name": "VersionConsumerGroupTargetAssignmentMetadataValue", 
    "versionsvalidVersions": "0+", 
    "typeflexibleVersions": "int160+" },
                "fields": [
        { "name": "MetadataAssignmentEpoch", "versions": "0+", "type": "bytesint32" }
          ]}
    ], 
}

Target Assignment

The target assignment is stored in a single record.

...

},
    ]
}

The AssignmentEpoch corresponds to the group epoch used to compute the assignment. It is not necessarily the most recent group epoch because the assignment is computed asynchronously when a client-side assignor is used. When a client side assignor is used, the assignment is computed asynchronously. While it is computed for the group at epoch X, the group may have already advanced to epoch X+1 due to another event (e.g. new member joined). In this case, we have chosen to install the assignment computed for epoch X and to trigger a new assignment computation right away.

ConsumerGroupTargetAssignmentMemberKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentKeyConsumerGroupTargetAssignmentMemberKey",
    "validVersions": "67",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "57" }
    ]
}

ConsumerGroupTargetAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        ,
        { "name": "AssignmentEpochMemberId", "versionstype": "0+string", "typeversions": "int327" },
     ]
}

ConsumerGroupTargetAssignmentMemberValue

Code Block
languagejs
linenumberstrue
{
    "type      { "name": "Membersdata", 
    "versionsname": "0+ConsumerGroupTargetAssignmentMemberValue", 
    "typevalidVersions": "[]Member0", "fields": [
        	{ "name": "MemberId", "versions
    "flexibleVersions": "0+", 
    "typefields": "string" },[
            { "name": "Error", "versions": "0+", "type": "int8" },
            { "name": "TopicPartitions", "versions": "0+",
                 	  "type": "[]TopicPartition", "fields": [
            	      	  { "name": "TopicId", "versions": "0+", "type": "uuid" },
            	          { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        	]},
        	{ "name": "VersionMetadataVersion", "versions": "0+", "type": "int16" },
        	{ "name": "MetadataMetadataBytes", "versions": "0+", "type": "bytes" }
        ]
    ]
}

Current Member Assignment

...

When a member is deleted from the group, a tombstone for him it is written to the partition.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentKey",
    "validVersions": "78",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "78" },
      	{ "name": "MemberId", "type": "string", "versions": "78" },
    ]
}

ConsumerGroupCurrentMemberAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },
		{ "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "VersionMetadataVersion", "versions": "0+", "type": "int16" },
        { "name": "MetadataMetadataBytes", "versions": "0+", "type": "bytes" }
    ], 
}

Offsets

OffsetCommitValue

Code Block
languagejs
linenumberstrue
{
  "type": "data",
  "name": "OffsetCommitValue",
  "validVersions": "0-4",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "offset", "type": "int64", "versions": "0+" },
    { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true },
    { "name": "metadata", "type": "string", "versions": "0+" },
    { "name": "commitTimestamp", "type": "int64", "versions": "0+" },
    { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true },
    // Adds TopicId field.
    { "name": "topicId", "type": "uuid", "versions": "4", "ignorable": true }
  ]
}

...

The new PartitionAssignor interface will be introduced on the server side. Two implementations will be provided out of the box: RangeAssignor (range) and UniformAssignor (uniform).

Code Block
languagejava
linenumberstrue
package org.apache.kafka.common.errors;

public class PartitionAssignorException extends ApiException {

    public PartitionAssignorException(String message) {
        super(message);
    }

    public PartitionAssignorException(String message, Throwable cause) {
        super(message, cause);
    }
}


Code Block
languagejava
linenumberstrue
package org.apache.kafka.server.group.consumer;

public interface PartitionAssignor {

    class Group    class AssignmentSpec {
        /**
         * The members         * The members keyed by member id.
         */
        List<GroupMember>        Map<String, AssignmentMemberSpec> members;

        /**
                  * The topics' metadata. keyed by topic id
         */
        List<TopicMetadata>        Map<Uuid, AssignmentTopicMetadata> topics;
        }

    class GroupMember    class AssignmentMemberSpec {
        /**
         * The memberinstance ID if provided.
         */
        String memberId        Optional<String> instanceId;

        /**
                  * The instancerack ID if provided.
         */
        Optional<String> instanceId        Optional<String> rackId;

                /**
         * The set of topic IDs topics that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds        Collection<String> subscribedTopics;

        /**
         * The current target partitions of the member.
         */
        Collection<TopicPartition> targetPartitions;
    }

    class AssignmentTopicMetadata {
        /**
         * The topic name.
 partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

       class TopicMetadataString {topicName;

      	  /**
		 * The number topicof IDpartitions.
		 */
		Uuidint topicIdnumPartitions; 

    }

    class GroupAssignment {
        /**
		         * The number of partitions.
		 */
		int numPartitions;  member assignments keyed by member id.
         */
        Map<String, MemberAssignment> members;
    } 

      classclass AssignmentMemberAssignment {
        /**
         * The target partitions assigned to this member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);
}

Broker Metrics

The set of new metrics is not clear at the moment. We plan to amend the KIP later on when progress on the implementation would have been made.

  • Group count by type
  • Group count by state
  • Rebalance Rate
.
         */
        Collection<TopicPartition> targetPartitions;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param assignmentSpec The assignment spec.
     * @return The new assignment for the group.
     */
    GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException;
}

Broker Metrics

Existing generic group metrics have been migrated, with the same metric names except for

NumGroups which reported the number of generic groups. This metric changed to

kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|generic}

  • number of groups based on type where type is the rebalance protocol

kafka.server:type=group-coordinator-metrics,name=partition-count,state={loading|active|failed}

  • number of __consumer_offsets partitions based on state

kafka.server:type=group-coordinator-metrics,name=event-queue-size

  • event accumulator queue size

kafka.server:type=group-coordinator-metrics,name=consumer-group-count,state={empty|assigning|reconciling|stable|dead}

  • number of consumer groups based on state

consumer group rebalances sensor

  • kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-rate
  • kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-count

partition load sensor: __consumer_offsets partition load time

  • kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
  • kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg

thread idle ratio sensor: thread busy - idle ratio

  • kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
  • kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg

Broker Configurations

New properties in the broker configuration.

NameTypeDefaultDoc
group.coordinator.threadsint1The number of threads used to run the state machines.
group.consumer.session.timeout.msint30s45sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.min.session.timeout.msint45sThe minimum session timeout.
group.consumer.max.session.timeout.msint60sThe maximum session timeout.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.
group.consumer.min.heartbeat.interval.msint5sThe minimum heartbeat interval.
group.consumer.max.heartbeat.interval.msint15sThe maximum heartbeat interval.
group.consumer.max.sizeintMaxValueThe maximum number of consumers that a single consumer group can accommodate.
group.consumer.assignorsListrange, uniformThe server side assignors.
accommodate.
group.consumer.assignorslistorg.apache.kafka.server.group.consumer.UniformAssignor, org.apache.kafka.server.group.consumer.RangeAssignorThe server side assignors as a list of full class names. The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor.

Group Configurations

New dynamic group properties.

NameTypeDefaultDoc
group.consumer.session.timeout.msint30s45sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;
  
public interface PartitionAssignor {

    class Metadata {
        /**
         * The metadata version.
         */
        int version;

        /**
         * The metadata bytes.
         */
        ByteBuffer bytes;

    class Group    }

    class AssignmentSpec {
        /**
                  * The members keyed by member id.
         */
        List<GroupMember>        Map<String, AssignmentMemberSpec> members;

        /**
         * The topics' metadata.         * The topics' metadata keyed by topic id
         */
        List<TopicMetadata>        Map<Uuid, AssignmentTopicMetadata> topics;
        }

    class GroupMember    class AssignmentMemberSpec {
        /**
         * The memberinstance ID if provided.
         */
        String memberId        Optional<String> instanceId;

        /**
                  * The instancerack ID if provided.
         */
        Optional<String> instanceId        Optional<String> rackId;

                /**
         * The set of topic IDs topics that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds        Collection<String> subscribedTopics;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encodedreported inby {{@link GroupMember#metadata()}}the member.
		 */
		int version        Metadata metadata;

        /**
                  * The customcurrent metadatatarget providedpartitions byof the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
            Collection<TopicPartition> targetPartitions;
    }

    class TopicMetadataAssignmentTopicMetadata {

        	/**
		         * The topic IDname.
		         */
		Uuid topicId        String topicName;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    }

    class Assignment    class GroupAssignment {
        /**
         * The assignment error.
         */
		byte error;

        /**
                  * The member assignments keyed by member assignmentid.
         */
        List<MemberAssignment>        Map<String, MemberAssignment> members;
      }

    class    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitionstarget partitions assigned to this member.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 */
		byte error; Collection<TopicPartition> targetPartitions;

 		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer	 * The metadata.
		 */
		Metadata metadata;
        }

	class MetadataAssignorMetadata {
   		/**
		 * The reason reported by the assignor.
		 */
		byte reason; 

 		/**
		 * The versionmetadata reported ofby the assignor.
		 */
		Metadata metadata encoded in {{@link Metadata#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return assignor metadata that will be sent to the assignor.
              */
        ByteBuffer    AssignorMetadata metadata(); 
    }

    /**

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * Unique@param nameassignmentSpec forThe thisassignment assignorspec.
     */
    String name();

    /**
     * @return The minimum versionnew assignment for the group.
     */
    int minimumVersion(    GroupAssignment assign(AssignmentSpec assignmentSpec);

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment Callback which is invoked when the member received a new assignment 
     * from the assignor/group coordinator. This is called once per epoch
     * and contains the target partitions for this members. This means that
     * partitions may not be assigned to the member yet. The rebalance
     * listener must be used to know this.
     * 
     * @param byte The error reported by the assignor.
     * @param assignment The assignment computed by the assignor.
     * @param consumerGroupMetadata The group metadata.
     */
    void onAssignment(byte error, MemberAssignment assignment, ConsumerGroupMetadata consumerGroupMetadata);
}

New SubscriptionPattern class

...

  • Consumer#enforeRebalance
  • Consumer#enforeRebalance(String)
  • Consumer#subscribe(Pattern)
  • Consumer#subscribe(Pattern, ConsumerRebalanceListener)

Deprecate ConsumerPartitionAssignor interface

...

The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.

Deprecate Consumer configurations

The following configurations will be deprecated:

  • partition.assignment.strategy
  • session.timeout.ms
  • heartbeat.interval.ms

Consumer Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

group.remote.assignorstringuniformnullThe server side assignor to use. It cannot be used in conjunction with group.local.assignor. null means that the choice of the assignor is left to the group coordinator.
group.local.assignorslistemptyThe list of client side (local) assignors as a list of full class names. It cannot be used in conjunction with group.remote.assignor.

...

Code Block
languagejava
linenumberstrue
public class ConsumerGroupDescription {
    public String type() {
      return type;
    }
}

public class MemberDescription {
    // Current Assignment

public class MemberDescription {
    /**
     * The current assignment of the member. Provided for both generic group and consumer group.
     */
    public MemberAssignment assignment() {}

    // Target Assignment
    public MemberAssignment    /**
     * The target assignment of the member. Provided only for consumer group.
     */
    public Optional<MemberAssignment> targetAssignment() {}
}

public class MemberAssignmentMetadata {
	    /**
	     * The reasonmetadata version.
 reported by the assignor.
	 */
	byteint errorversion; 

	    /**
	     * The version of the metadata encoded in {{@link Metadata#metadata()}}.
	 */
	int version;

metadata bytes.
     */
	ByteBuffer bytes;
} 

public class MemberAssignment {
	/**
     * The partitions.
     */
    Set<TopicPartition> topicPartitions;

 	/**
	 * The customerror metadatareported provided by the assignor. Provided only if the group is a ConsumerGroup type.
	 */
	ByteBufferbyte metadataerror;

	    /**
     * The assigned metadata. partitionsProvided ownedonly byif the membergroup atis theConsumerGroup current epochtype.
         */
       List<TopicIdPartition>Optional<Metadata> ownedPartitionsmetadata;
}

Admin#incrementalAlterConfigs and Admin#describeConfigs

...

Code Block
languagebash
linenumberstrue
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–validate-regex <pattern>

Case Studies

All the examples shown in this chapter are based on the in-memory representation of the group coordinator.

Basic

Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.

...

  • Group (epoch=0)
    • Empty
  • Target Assignment (epoch=0)
    • Empty
  • Member Assignment
    • Empty

...

Member A joins the group. The coordinator bumps the group epoch to 1, adds A to the group, and creates an empty member assignment.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=0)
    • Empty
  • Member Assignment
    • A - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment. All the partitions are assigned to A.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=0, partitions=[], pending-partitions=[]

When A heartbeats, the group coordinator transitions him it to its target epoch/assignment because it does not have any partitions to revoke. The group coordinator updates the member assignment and replies with the new epoch 1 and all the partitions.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]

Member B joins the group. The coordinator adds the member to the group and bumps the group epoch to 2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=10, partitions=[foo-2], pending-partitions=[]

At this point B can transitions to epoch 1 2 but cannot get foo-2 until A revokes it. Note that the persisted target assignment of B already includes foo-2 here but B does not get it yet..

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=2, partitions=[], pending-partitions=[foo-2]

When A heartbeats, the group coordinator instructs him it to revoke foo-2.

When A heartbeats again and acknowledges the revocation, the group coordinator transitions him it to epoch 2 and releases foo-2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=12, partitions=[], pending-partitions=[foo-2]

When B heartbeats, he can now gets foo-2.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]

Member C joins the group. The coordinator adds the member to the group and bumps the group epoch to 3.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1-1], pending-partitions=[]
    • B - epoch=2, partitions=[foo-2], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

The coordinator computes and installs the new target assignment.When B heartbeats, the group coordinator transitions it to epoch 3 because B has no partitions to revoke. It persists the change and reply. 

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=23, partitions=[foo-2], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

When B C heartbeats, the group coordinator it transitions him to epoch 3 because B has no partitions to revoke. It persists the change and reply.but cannot get foo-1 yet. 

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=3, partitions=[foo-2], pending-partitions=[]
    • C - epoch=3, partitions=[foo-1]

...

    • , pending-partitions=[foo-1

...

    • ]

When A heartbeats, the group coordinator instructs him it to revoke foo-1.

When A heartbeats again and acknowledges the revocation, the group coordinator transitions him to epoch 2.When C heartbeats, the group coordinator transitions him it to epoch 3 , persists the change, and replyand releases foo-1.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0], pending-partitions=[]
    • B - epoch=3, partitions=[foo-2], pending-partitions=[]
    • C - epoch=3, partitions=[foo-1], pending-partitions=[]

All the members have eventually advanced to the group epoch (3).

...

  • Group (epoch=21)
    • A
    • B
  • Target Assignment (epoch=21)
    • A - partitions=[foo-0, foo-1, foo-2]
    • B - partitions=[foo-3, foo-4, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5-5], pending-partitions=[]

C joins the group. The group coordinator adds himit, bumps the group epoch, create the member assignment, and computes the target assignment.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]
    • C - epoch=0, partitions=[], pending-partitions=[]

C heartbeats, the group coordinator transitions him it to epoch 22 but does not yet give him it any partitions because they are not revoked yet.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=21, partitions=[foo-0, foo-1, foo-2], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]
    • C - epoch=22, partitions=[], pending-partitions=[foo-2, foo-5]

A heartbeats, the group coordinator instructs him it to revoke foo-2.

B heartbeats, the group coordinator instructs him it to revoke foo-5.

C heartbeats, no changes for himit.

A heartbeats and acknowledges the revocation, the group coordinator transitions him it to epoch 22, release foo-2, persists and reply.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=21, partitions=[foo-3, foo-4, foo-5], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2], pending-partitions=[foo-5]

C heartbeats, the group coordinator gives him it foo-2 which is now free but hold foo-5.

B heartbeats and acknowledges the revocation, the group coordinator transitions him it to epoch 22, releases foo-5, persists and reply.

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

C heartbeats, the group coordinator gives him it foo-2 and foo-5.

Member Failure

...

  • Group (epoch=22)
    • A
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

A fails to heartbeat. The group coordinator removes him it after the session timeout expires and bump the group epoch.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

The group coordinator computes the new target assignment.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=23)
    • B - partitions=[foo-3, foo-4, foo-0]
    • C - partitions=[foo-2, foo-5, foo-1]
  • Member Assignment
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

B and C heartbeat and transition to epoch 23.

  • Group (epoch=23)
    • B
    • C
  • Target Assignment (epoch=23)
    • B - partitions=[foo-3, foo-4, foo-0]
    • C - partitions=[foo-2, foo-5, foo-1]
  • Member Assignment
    • B - epoch=23, partitions=[foo-3, foo-4, foo-0], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-1], pending-partitions=[]

Partition Added

Let's start with a group with two members and one partition.

  • Group (epoch=22)
    • A
    • B
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0]
    • B - partitions=[]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0], pending-partitions=[]
    • B - epoch=22, partitions=[], pending-partitions=[]

A new partition foo-1 is created. The group coordinator detects it. It updates the group and bump the group epoch.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0]
    • B - partitions=[]
  • Member Assignment
    • A - epoch=22, , partitions=[foo-0], pending-partitions=[foo-0]
    • B - epoch=22, partitions=[], pending-partitions=[]

The group coordinator computes a new target assignment.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0]
    • B - partitions=[foo-1]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0], pending-partitions=[]
    • B - epoch=22, partitions=[], pending-partitions=[]

B and C heartbeat and transition to epoch 23.

  • Group (epoch=23)
    • A
    • B
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0]
    • B - partitions=[foo-1]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0], pending-partitions=[]
    • B - epoch=23, partitions=[foo-1], pending-partitions=[]

Online Migration

We starts with a generic group.

  • Generic Group (generation=22)
    • A
    • B
    • C
  • Assignment
    • A - partitions=[foo-0, foo-1], pending-partitions=[]
    • B - partitions=[foo-3, foo-4], pending-partitions=[]
    • C - partitions=[foo-2, foo-5], pending-partitions=[]

A leaves and rejoins with the new protocol enabled. The group is converted. The current generation becomes the group epoch. The target assignment and the member assignments are created based on the current assignment.

  • Group (epoch=22)
    • A (upgraded)
    • B
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=22, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

A uses the new protocol. B and C still use the old protocol.

B leaves the group. The group coordinator removes him it and bumps the group epoch.

  • Group (epoch=23)
    • A (upgraded)
    • C
  • Target Assignment (epoch=22)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

The group coordinator computes a new target assignment and installs it. It also triggers a rebalance for C. 

  • Group (epoch=23)
    • A (upgraded)
    • C (PreparingRebalance)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • C - epoch=22, partitions=[foo-2, foo-5], pending-partitions=[]

C heartbeats and is notified that a rebalance is required. C revokes all its partitions (assuming Eager protocol is used here) and sends a JoinGroup request.

The group coordinator sees that C does not own any partitions any more, so it can transition to epoch 23 and transition to CompletingRebalance. The transition to epoch 23 is important here because the new epoch must be given to the member in the JoinGroup response. This is the new generation of the group for it.

  • Group (epoch=23)
    • A (upgraded)
    • C (CompletingRebalance)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=22, partitions=[foo-0, foo-1], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

In the meantime, A heartbeats and transitions to epoch 23 as well.

  • Group (epoch=23)
    • A (upgraded)
    • C (CompletingRebalance)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

C sends the SyncGroup request and collects his its new assignment. All partitions are given because they are all free. C transitions to Stable.

  • Group (epoch=23)
    • A (upgraded)
    • C (Stable)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

B rejoins the group with the new protocol. The group coordinator adds it and bumps the group epoch.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (Stable)
  • Target Assignment (epoch=23)
    • A - partitions=[foo-0, foo-1, foo-3]
    • C - partitions=[foo-2, foo-5, foo-4]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B A - epoch=0, =0, partitions=[], pending-partitions=[]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

The group coordinator computes a new target assignment. A rebalance is triggered for C to revoke foo-4.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (PreparingRebalance)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B - epoch=0, partitions=[], pending-partitions=[foo-3, foo-4]
    • C - epoch=23, partitions=[foo-2, foo-5, foo-4], pending-partitions=[]

A heartbeats and he is told to revoke foo-3.

...

The group coordinator sees that C does not own any partitions any more, so it can transition to epoch 24 and transition to CompletingRebalance. foo-4 is released.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (CompletingRebalance)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B - epoch=24, partitions=[foo-3, 4], pending-partitions=[foo-43]
    • C - epoch=24, partitions=[foo-2, foo-5], pending-partitions=[]

C sends the SyncGroup request to collect its assignment. He transitions to Stable.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (Stable)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=23, partitions=[foo-0, foo-1, foo-3], pending-partitions=[]
    • B - epoch=24, partitions=[foo-3, 4], pending-partitions=[foo-43]
    • C - epoch=24, partitions=[foo-2, foo-5], pending-partitions=[]

A heartbeats. He confirms the revocation of foo-3. He transitions to epoch 24. foo-3 is released.

  • Group (epoch=24)
    • A (upgraded)
    • B (upgraded)
    • C (Stable)
  • Target Assignment (epoch=24)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-3, foo-4]
    • C - partitions=[foo-2, foo-5]
  • Member Assignment
    • A - epoch=24, partitions=[foo-0, foo-1], pending-partitions=[]
    • B - epoch=24, partitions=[foo-3, foo-4], pending-partitions=[]
    • C - epoch=24, partitions=[foo-2, foo-5], pending-partitions=[]

B heartbeats and gets his its assignment.

Compatibility, Deprecation, and Migration Plan

...

We considered storing the dynamic group configurations in the group coordinator in order to have the ability to tight their lifecycles to their group. We discarded this approach for two reasons: 1) This pattern does not fit very well in the IncrementalAlterConfig API as it would require to send updates about groups to the coordinator whereas all the other updates go to the controller; and 2) It seems preferable to decouple the life cycle of the dynamic configurations from the life cycle of the groups. Users may want to create configurations before the group is created and users may want to keep their configurations if the group is recreatedrecreated.

Client side generated Member ID

We considered letting the client generate its member ID (or UUID) instead of relying on the coordinator to generate one when the member joins the group. We ultimately rejected this because 1) it introduces extra dependencies on the client. For instance in C++, UUID are not natively supported so an extra library must be used; and 2) the client could not generate it correctly.

Future Work

Eventually, we aim at deprecating the current membership/rebalance API. In order to get to this point, we would need to first move all the use cases away from it.

...

The group membership protocol is also used outside of Apache Kafka. For instance, the Confluent Schema Registry uses it for leader election. It is not clear whether we really want to suppose such cases in the future. If we do, we could also define a new set of APIs for it. That would be much cleaner in the long run.

Metadata Transactions

The KIP proposes the rely on the atomicity of the batch to write the assignment to the __consumer_offsets partitions. This means that the assignment or, to be precise, the delta between two assignments can not be larger than 1MB where 1MB is the default batch size. In the future, we could imagine doing something similar to KIP-868 Metadata Transactions in the group coordinator. The solution outlined in KIP-868 does not work in our context because the __consumer_offsets is compacted. However, we could imagine a similar approach. We will tackle this in the future if needed.

Upgrade / Downgrade

The KIP proposes to rely on the IBP/MetadataVersion to decide whether a record or an API could be used or not. We have discussed the idea to use a dedicate feature flag instead of relying on metadata.version. That would allow decoupling the group coordinator from the quorum controller during upgrades. We also need to flush out how to handle downgrades. We will do this in a future KIP.