Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The group configurations are stored in the controller like all the other dynamic configurations in the cluster. This allows configurations to be installed independently from wether whether the group exists or not. Configurations are also preserved if the group is deleted. In ZK mode, the dynamic group configurations will be store in the /config/groups znode. In KRaft mode, they are stored in the ConfigRecord.

...

  • It has to setup the session timeouts for all the members (like today).
  • It has to check wether whether the topic-partition metadata has changed and potentially trigger a rebalance for the group if it has.
  • It has to check wether whether new topics match the regex subscriptions and trigger a rebalance for the group if new topic do.
  • It has to check wether whether a new assignment is required for the group (group epoch != assignment epoch). If it is the case, the group coordinator can directly compute it using the server side assignor or can trigger a client side assignment computation.

...

  1. Lookups the group or creates it.
  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and its request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions owned by the members are a subset of the target partitions. If it is the case, it is safe to transition the member to its target epoch again.
  4. Updates the members informations if any. The group epoch is incremented if there is any change. See "Group Epoch - Trigger a rebalance" chapter for details about the rebalance triggers.
  5. Reconcile the member assignments as explained earlier in this document. 

...

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  1. Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Returns the group state of the group.

...

When the group coordinator handle a ConsumerGroupInstallAssignmentRequest request:

  1. Checks wether whether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether whether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether whether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.
  6. Installs the new target assignment.

...

When the group coordinator handle a ConsumerGroupDescribeRequest request:

  • Checks wether whether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  • Looks up the groups and returns the response.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadataTopics", "versions": "0+",
          "type": "[]TopicPartitionTopicMetadata", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "NumPartitions", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "nullableVersions": "0+", "type": "string" },
        { "name": "RackId", "versions": "0+", "nullableVersions": "0+", "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
			{ "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.server.group.consumer;

public interface PartitionAssignor {

    class AssignmentSpec {
        /**
         * The members keyed by member id.
         */
        Map<String, AssignmentMemberSpec> members;

        /**
         * The topics' metadata keyed by topic id
         */
        Map<Uuid, AssignmentTopicMetadata> topics;
    }

    class AssignmentMemberSpec {
        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The rack ID if provided.
         */
        Optional<String> rackId;

        /**
         * The topics that the member is subscribed to.
         */
        Collection<String> subscribedTopics;

        /**
         * The current target partitions of the member.
         */
        Collection<TopicPartition> targetPartitions;
    }

    class AssignmentTopicMetadata {
        /**
         * The topic name.
         */
        String topicName;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    }

    class GroupAssignment {
        /**
         * The member assignments keyed by member id.
         */
        Map<String, MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The target partitions assigned to this member.
         */
        Collection<TopicPartition> targetPartitions;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param assignmentSpec The assignment spec.
     * @return The new assignment for the group.
     */
    GroupAssignment assign(AssignmentSpec assignmentSpec) throws PartitionAssignorException;
}

Broker Metrics

The set of new metrics is not clear at the moment. We plan to amend the KIP later on when progress on the implementation would have been made.

Existing generic group metrics have been migrated, with the same metric names except for

NumGroups which reported the number of generic groups. This metric changed to

kafka.server:type=group-coordinator-metrics,name=group-count,protocol={consumer|generic}

  • number of groups based on type where type is the rebalance protocol

kafka.server:type=group-coordinator-metrics,name=partition-count,state={loading|active|failed}

  • number of __consumer_offsets partitions based on state

kafka.server:type=group-coordinator-metrics,name=event-queue-size

  • event accumulator queue size

kafka.server:type=group-coordinator-metrics,name=consumer-group-count,state={empty|assigning|reconciling|stable|dead}

  • number of consumer groups based on state

consumer group rebalances sensor

  • kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-rate
  • kafka.server:type=group-coordinator-metrics,name=consumer-group-rebalance-count

partition load sensor: __consumer_offsets partition load time

  • kafka.server:type=group-coordinator-metrics,name=partition-load-time-max
  • kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg

thread idle ratio sensor: thread busy - idle ratio

  • kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-min
  • kafka.server:type=group-coordinator-metrics,name=thread-idle-ratio-avg
  • Group count by type
  • Group count by state
  • Rebalance Rate
  • Thread utilisation in percent

Broker Configurations

New properties in the broker configuration.

...

The KIP proposes to rely on the IBP/MetadataVersion to decide wether whether a record or an API could be used or not. We have discussed the idea to use a dedicate feature flag instead of relying on metadata.version. That would allow decoupling the group coordinator from the quorum controller during upgrades. We also need to flush out how to handle downgrades. We will do this in a future KIP.

...