Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumer Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to trigger a rebalance when the partition metadata changes.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member. The ID is similar to an incarnation ID. It is generated by the client once and must be used during its lifetime.
Instance IDstringThe instance ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client ID configured by the consumer.
Subscribed Topic Names[]stringThe current set of subscribed topic names configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Reasonint8The reason why the metadata was updated.
Minimum Versionint32int16The minimum version of the metadata schema supported by this assignor.
Maximum Versionint32int16The maximum version of the metadata schema supported by this assignor.
ReasonVersionint8int16The reason why version used to encode the metadata was updated.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

Current Assignment
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDstringThe member ID of this member.
Member Epochint32

The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member. This epoch is the one used to fence the member (e.g. offsets commit).

Error int8The error reported by the assignor.
PartitionsPartitionsThe current partitions used by the member.
Versionint16The version used to encode the metadata.
MetadatabytesThe current metadata used by the member.

...

  • FENCED_MEMBER_EPOCH - The member epoch does not correspond to the member epoch expected by the coordinator. The member must abandon its partitions and rejoin the group.
  • COMPUTE_ASSIGNMENT - The member has been selected by the coordinator to compute the new target assignment of the group.
  • UNSUPPORTED_ASSIGNOR - The assignor used by the member or its version range are not supported by the group.

...

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id generated by the server. The member id must be kept during the entire lifetime of the member." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null it not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
    { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The name of the assignor." },
        { "name": "MinimumVersion", "type": "int32int16", "versions": "0+",
          "about": "The minimum supported version for the metadata." },
        { "name": "MaximumVersion", "type": "int32int16", "versions": "0+",
          "about": "The maximum supported version for the metadata." },
        { "name": "VersionReason", "type": "int32int8", "versions": "0+",
          "about": "The versionreason of the metadata update." }, 
        { "name": "ReasonVersion", "type": "int8int16", "versions": "0+",
          "about": "The reasonversion of the metadata update." }, 
           { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The metadata." }
      ]},
    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the topicspartitions owned by the member.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
      ]}
  ]
}

Required ACL

  • Read Group

Request Validation

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise."
      "fields": [
	     	{ "name": "TopicPartitionsError", "type": "[]TopicPartitionint8", "versions": "0+", "nullableVersions
          "about": "The assigned error." },
        { "name": "TopicPartitions", "type": "0+[]TopicPartition", "defaultversions": "null0+",
          "about": "The assigned topic-partitions to the member otherwise.",
          "fields": [
        	{ "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic ID." },
	        { "name": "Partitions", "type": "[]int32", "versions": "0+",
              "about": "The partitions." }
      	]},
       	 { "name": "ErrorVersion", "type": "int8int16", "versions": "0+", "nullableVersions": "0+", "default": "null",
  
          "about": "The version of assignedthe errormetadata." },
 
	       { "name": "Metadata", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned metadata." }
	]
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "ClientAssignorNameAssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "AssignorVersion", "type": "int32int16", "versions": "0+",
        "about": "The version of the metadata." },
      { "name": "AssignorReason", "type": "int8", "versions": "0+",
        "about": "The reason of the metadata update." }, 
      { "name": "AssignorMetadata", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata." },
      { "name": "TopicPartitions", "type": "[]TopicPartition",
        "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "PartitionMetadataTopics", "type": "[]MetadataTopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]}    
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupInstallAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH  
  // - INVALID_ASSIGNMENT  
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

...

The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.

Records

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataKey",
    "validVersions": "3",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "3" }
    ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataKey",
    "validVersions": "4",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "4" }
    ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int32int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int32int16" },
            { "name": "Version", "versions": "0+", "type": "int32int16" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentKey",
    "validVersions": "6",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "5" }
    ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
        	{ "name": "MemberId", "versions": "0+", "type": "string" },
        	{ "name": "TopicPartitions", "versions": "0+",
          	  "type": "[]TopicPartition", "fields": [
            	{ "name": "TopicId", "versions": "0+", "type": "uuid" },
            	{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
        	]},
        	{ "name": "MetadataError", "versions": "0+", "type": "int8" },
        	{ "name": "MetadataVersion", "versions": "0+", "type": "int32int16" },
        	{ "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
        ]
    ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentKey",
    "validVersions": "7",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "7" },
      	{ "name": "MemberId", "type": "string", "versions": "7" },
    ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },
        		{ "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
           { "name": "MetadataErrorVersion", "versions": "0+", "type": "int8int16" },
                { "name": "MetadataVersionMetadata", "versions": "0+", "type": "int32bytes" },
        { "name": "MetadataBytes
    ], 
}

GroupConfigurationKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "GroupConfigurationKey",
    "validVersions": "8",
    "flexibleVersions": "none",
    "fields": [
     	{ "name": "GroupId", "type": "string", "versions": "8" }
    ]
}

GroupConfigurationValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "GroupConfigurationValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Configurations", "versions": "0+", "type": "[]Configuration",
          "fields": [
		     { "name": "Name", "type": "string", "versions": "0+",
      		   "about": "The name of the configuration key." },
    		 { "name": "Value", "type": "bytes"  "string", "versions": "0+",
      		   "about": "The value of the configuration." }
		]}
    ], 
}

Broker API

The new PartitionAssignor interface will be introduced on the server side. Two implementations will be provided out of the box: RangeAssignor (range) and UniformAssignor (uniform).

Code Block
languagejava
linenumberstrue
package org.apache.kafka.server.group.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * The mapping from topic ID to number of partitions
         * as provided by the group coordinator
         */
        Map<Uuid, Integer> topicMetadatatopics' metadata.
         */
        List<TopicMetadata> topics;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

     class TopicMetadata {
      	/**

    class		 * The topic ID.
		 */
		Uuid topicId;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    } 

    class Assignment {
        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);
}

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * The mapping from topic ID to number of partitions*
         * asThe provided by the group coordinatortopics' metadata.
         */
        Map<Uuid, Integer> topicMetadata        List<TopicMetadata> topics;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

    class TopicMetadata {
      	/**
		 * The topic ID.
		 */
		Uuid topicId;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    }

    class Assignment {
        /**
         * The assignment error.
         */
		byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 */
		byte error; 

 		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

	class Metadata {
   		/**
		 * The reason reported by the assignor.
		 */
		byte reason; 

 		/**
		 * The version of the metadata encoded in {{@link Metadata#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata; 
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);
}

...