Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents
maxLevel4

Status

Current state: Under Discussion Accepted

Discussion thread: Thread 1 and Thread 2here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Consumer Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member. It is generated by the client upon startup coordinator upon the first heartbeat request and must be used during the lifetime of the member. The ID is similar to an incarnation ID.
Instance IDstringThe instance ID configured by the consumer.
Rack IDstringThe rack ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client host configured by the consumer.
Subscribed Topic Names[]stringThe current set of subscribed topic names configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Reasonint8The reason why the metadata was updated.
Minimum Versionint16The minimum version of the metadata schema supported by this assignor.
Maximum Versionint16The maximum version of the metadata schema supported by this assignor.
Versionint16The version used to encode the metadata.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

Every member is uniquely identified by a UUID. This is is called the Member ID. This UUID is generated on the client sideserver side and given to the member when it joins the group. It is used in all the communication with the group coordinator and must be kept during the entirely life span of the member (e.g. the consumer). In that sense, it is similar to an incarnation ID.

...

The group configurations are stored in the controller like all the other dynamic configurations in the cluster. This allows configurations to be installed independently from wether whether the group exists or not. Configurations are also preserved if the group is deleted. In ZK mode, the dynamic group configurations will be store in the /config/groups znode. In KRaft mode, they are stored in the ConfigRecord.

...

  • It has to setup the session timeouts for all the members (like today).
  • It has to check wether whether the topic-partition metadata has changed and potentially trigger a rebalance for the group if it has.
  • It has to check wether whether new topics match the regex subscriptions and trigger a rebalance for the group if new topic do.
  • It has to check wether whether a new assignment is required for the group (group epoch != assignment epoch). If it is the case, the group coordinator can directly compute it using the server side assignor or can trigger a client side assignment computation.

...

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
        "about": "The group identifier." },
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member id generated by the clientcoordinator. The member id must be kept during the entire lifetime of the member." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
      { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
      { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
        "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
      { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
        "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
      { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
      { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
      { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
      { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.", "fields": [
          { "name": "Name", "type": "string", "versions": "0+",
            "about": "The name of the assignor." },
          { "name": "MinimumVersion", "type": "int16", "versions": "0+",
            "about": "The minimum supported version for the metadata." },
          { "name": "MaximumVersion", "type": "int16", "versions": "0+",
            "about": "The maximum supported version for the metadata." },
          { "name": "Reason", "type": "int8", "versions": "0+",
            "about": "The reason of the metadata update." }, 
          { "name": "MetadataVersion", "type": "int16", "versions": "0+",
            "about": "The version of the metadata." },
          { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
            "about": "The metadata." }
      ]},
      { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "null if it didn't change since the last heartbeat; the partitions owned by the member.", "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
  ]
}

...

  1. Lookups the group or creates it.
  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and its request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions owned by the members are a subset of the target partitions. If it is the case, it is safe to transition the member to its target epoch again.
  4. Updates the members informations if any. The group epoch is incremented if there is any change. See "Group Epoch - Trigger a rebalance" chapter for details about the rebalance triggers.
  5. Reconcile the member assignments as explained earlier in this document. 

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - UNRELEASED_INSTANCE_ID
  // - GROUP_MAX_SIZE_REACHED
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." }, 
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
      "about": "True if the member should compute the assignment for the group." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise.", "fields": [
        { "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member that can be used immediately." },
        { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
          "about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." },
        { "name": "MetadataVersion", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
          "about": "The assigned metadata." }
	]}
  ],
  "commonStructs": [
    { "name": "TopicPartitions", "versions": "0+", "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
    ]}
  ]
}

...

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  1. Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Returns the group state of the group.

...

When the group coordinator handle a ConsumerGroupInstallAssignmentRequest request:

  1. Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.
  6. Installs the new target assignment.

...

When the group coordinator handle a ConsumerGroupDescribeRequest request:

  • Checks wether whether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  • Looks up the groups and returns the response.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadataTopics", "versions": "0+",
          "type": "[]TopicPartitionTopicMetadata", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "NumPartitions", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string" },
        { "name": "RackId", "versions": "0+", "typenullableVersions": "string0+" }, "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
			{ "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}

...

The AssignmentEpoch corresponds to the group epoch used to compute the assignment. It is not necessarily the last onenecessarily the most recent group epoch because the assignment is computed asynchronously when a client-side assignor is used. When a client side assignor is used, the assignment is computed asynchronously. While it is computed for the group at epoch X, the group may have already advanced to epoch X+1 due to another event (e.g. new member joined). In this case, we have chosen to install the assignment computed for epoch X and to trigger a new assignment computation right away.

ConsumerGroupTargetAssignmentMemberKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentMemberKey",
    "validVersions": "7",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "7" },
        { "name": "MemberId", "type": "string", "versions": "7" }
     ]
}

ConsumerGroupTargetAssignmentMemberValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentMemberValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
	    { "name": "MemberId", "versions": "0+", "type": "string" },
        { "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "TopicPartitions", "versions": "0+",
       	  "type": "[]TopicPartition", "fields": [
      	  { "name": "TopicId", "versions": "0+", "type": "uuid" },
          { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "VersionMetadataVersion", "versions": "0+", "type": "int16" },
        { "name": "MetadataMetadataBytes", "versions": "0+", "type": "bytes" }
    ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentKey",
    "validVersions": "8",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "78" },
      	{ "name": "MemberId", "type": "string", "versions": "78" },
    ]
}

ConsumerGroupCurrentMemberAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },
		{ "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "VersionMetadataVersion", "versions": "0+", "type": "int16" },
        { "name": "MetadataMetadataBytes", "versions": "0+", "type": "bytes" }
    ], 
}

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.server.group.consumer;

public interface PartitionAssignor {

    class AssignmentSpec {
        /**
         * The members keyed by member id.
         */
        Map<String, AssignmentMemberSpec> members;

        /**
         * The topics' metadata keyed by topic id
         */
        Map<Uuid, AssignmentTopicMetadata> topics;
    }

    class AssignmentMemberSpec {
        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The rack ID if provided.
         */
        Optional<String> rackId;

        /**
         * The topics that the member is subscribed to.
         */
        Collection<String> subscribedTopics;

        /**
         * The current target partitions of the member.
         */
        Collection<TopicPartition> targetPartitions;
    }

    class AssignmentTopicMetadata {
        /**
         * The topic name.
         */
        String topicName;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    }

    class GroupAssignment {
        /**
         * The member assignments keyed by member id.
         */
        Map<String, MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The target partitions assigned to this member.
         */
        Collection<TopicPartition> targetPartitions;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param assignmentSpec The assignment spec.
     * @return The new assignment for the group.
     */
    GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException;
}

Broker Metrics

The set of new metrics is not clear at the moment. We plan to amend the KIP later on when progress on the implementation would have been made.

Existing generic group metrics have been migrated, with the same metric names except for

NumGroups which reported the number of generic groups. This metric changed to

kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|generic}

  • number of groups based on type where type is the rebalance protocol

kafka.server:type=group-coordinator-metrics,name=partition-count,state={loading|active|failed}

  • number of __consumer_offsets partitions based on state

kafka.server:type=group-coordinator-metrics,name=event-queue-size

  • event accumulator queue size

kafka.server:type=group-coordinator-metrics,name=consumer-group-count,state={empty|assigning|reconciling|stable|dead}

  • number of consumer groups based on state

consumer group rebalances sensor

  • kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-rate
  • kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-count

partition load sensor: __consumer_offsets partition load time

  • kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
  • kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg

thread idle ratio sensor: thread busy - idle ratio

  • kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
  • kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
  • Group count by type
  • Group count by state
  • Rebalance Rate
  • Thread utilisation in percent

Broker Configurations

New properties in the broker configuration.

...

We considered storing the dynamic group configurations in the group coordinator in order to have the ability to tight their lifecycles to their group. We discarded this approach for two reasons: 1) This pattern does not fit very well in the IncrementalAlterConfig API as it would require to send updates about groups to the coordinator whereas all the other updates go to the controller; and 2) It seems preferable to decouple the life cycle of the dynamic configurations from the life cycle of the groups. Users may want to create configurations before the group is created and users may want to keep their configurations if the group is recreated.

Client side generated Member ID

We considered letting the client generate its member ID (or UUID) instead of relying on the coordinator to generate one when the member joins the group. We ultimately rejected this because 1) it introduces extra dependencies on the client. For instance in C++, UUID are not natively supported so an extra library must be used; and 2) the client could not generate it correctly.

Future Work

Eventually, we aim at deprecating the current membership/rebalance API. In order to get to this point, we would need to first move all the use cases away from it.

...

The KIP proposes to rely on the IBP/MetadataVersion to decide wether whether a record or an API could be used or not. We have discussed the idea to use a dedicate feature flag instead of relying on metadata.version. That would allow decoupling the group coordinator from the quorum controller during upgrades. We also need to flush out how to handle downgrades. We will do this in a future KIP.

...