Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether the member epoch matches the current member epoch. If it does not, STALE_MEMBER_EPOCH is returned.
  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Returns the group state of the group.

Response Schema

The response always contains both the topic id and the topic name. This is to ensure that the client-side assignor gets all the information that it needs to compute the assignment and does not have any local dependencies (e.g. metadata cache). We want the client-side assignor to be fast and independent. Moreover, topic names are required for Streams' assignor in order to map them to the topology which is based on topic names.

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - STALE_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The member instance ID." },
      { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", 
        "about": "The member instance ID." },
      { "name": "SubscribedTopicsSubscribedTopicIds", "type": "[]SubscribedTopicuuid", "versions": "0+",
        "about": "The subscribed topic IDs.", "fields": [ },
          { "name": "TopicIdAssignor", "type": "uuidAssignor", "versions": "0+",
            "about": "The topic ID." } information of the selected assignor",
        "fields": [ 
        { "name": "TopicNameReason", "type": "stringint8", "versions": "0+",
            "about": "The topic name reason of the metadata update." }, 
      ]},
      { "name": "AssignorMetadataVersion", "type": "Assignorint16", "versions": "0+",
          "about": "The informationversion of the selected assignor"metadata." },
        "fields": [ 
        { "name": "ReasonMetadataBytes", "type": "int8bytes", "versions": "0+",
          "about": "The reason of the metadata update." }, 
 assignor metadata." }
      ]},
       { "name": "MetadataVersionTopicPartitions", "type": "int16[]TopicPartition", "versions": "0+",
          "about": "The target versiontopic-partitions of the metadata." },member.",
        "fields": [
          { "name": "MetadataBytesTopicId", "type": "bytesuuid", "versions": "0+",
            "about": "The assignortopic metadataID." },
      ]},
      { "name": "TopicPartitionsPartitions", "type": "[]TopicPartitionint32", "versions": "0+",
            "about": "The target topic-partitions of the member.", }
        "fields": []}
      ]},
    { "name": "TopicIdTopics", "type": "uuid[]TopicMetadata", "versions": "0+",
            "about": "The topic-partition IDmetadata." },
      "fields": [
        { "name": "TopicNameTopicId", "type": "stringuuid", "versions": "0+",
            "about": "The topic nameID." },
          { "name": "PartitionsTopicName", "type": "[]int32string", "versions": "0+",
            "about": "The topic partitionsname." }
      ]},
    ]},
    { "name": "TopicsNumPartitions", "type": "[]TopicMetadataint32", "versions": "0+",
          "about": "The number topic-partitionof metadatapartitions.", }
    ]}  "fields":  [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        ]
} 

Response Handling

If the response contains no error, the member calls the client side assignor with the group state.

Upon receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

Upon receiving the STALE_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

ConsumerGroupInstallAssignment API

The ConsumerGroupInstallAssignment API will be used by the consumer to install a new assignment for the group. The new assignment is the result of the client-side assignor.

Request Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupInstallAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "TopicNameGroupId", "type": "string", "versions": "0+",
 "entityType": "groupId",
        "about": "The topicgroup nameidentifier." },
        { "name": "NumPartitionsMemberId", "type": "int32", "versions": "0+",
          "about": "The number of partitions." }
    ]}    
  ]
} 

Response Handling

If the response contains no error, the member calls the client side assignor with the group state.

Upon receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

Upon receiving the STALE_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

ConsumerGroupInstallAssignment API

The ConsumerGroupInstallAssignment API will be used by the consumer to install a new assignment for the group. The new assignment is the result of the client-side assignor.

Request Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupInstallAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "Error", "type": "int8", "versions": "0+",
      "about": "The assignment error; or zero if the assignment is successful." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
        ]},
      { "name": "MetadataVersion", "type": "int32", "versions": "0+",
        "about": "The metadata version." }
      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The metadata bytes." }
    ]}
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 71,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
          { "name": "MemberId", "type": "uuid", "versions": "0+",
            "about": "The member ID." },
          { "name": "InstanceId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null", 
            "about": "The member instance ID." },
          { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null", 
            "about": "The member rack ID." },
          { "name": "MemberEpoch", "type": "int32", "versions": "0+",
            "about": "The current member epoch." },
          { "name": "ClientId", "type": "string", "versions": "0+",
            "about": "The client ID." },
          { "name": "ClientHost", "type": "string", "versions": "0+",
            "about": "The client host." },
          { "name": "SubscriptionsSubscribedTopicIds", "type": "[]uuid", "versions": "0+",
            "about": "The subscribed topic IDs." },
          { "name": "Assignment", "type": "Assignment", "versions": "0+",
            "about": "The current assignment.",
            "fields": [
            { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "Version", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "Metadata", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]},
          { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
            "about": "The target assignment.",
            "fields": [
            { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "Version", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "Metadata", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]},
      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this group." }
    ]}
  ]
}

...

Code Block
languagejava
linenumberstrue
package org.apache.kafka.common.errors;

public class PartitionAssignorException extends ApiException {

    public PartitionAssignorException(String message) {
        super(message);
    }

    public PartitionAssignorException(String message, Throwable cause) {
        super(message, cause);
    }
}


Code Block
languagejava
linenumberstrue
package org.apache.kafka.server.group.consumer;

public interface PartitionAssignor {

    class Group    class AssignmentSpec {
        /**
                  * The members keyed by member id.
         */
        List<GroupMember>        Map<String, AssignmentMemberSpec> members;

        /**
                  * The topics' metadata. keyed by topic id
         */
        List<TopicMetadata>        Map<Uuid, AssignmentTopicMetadata> topics;
        }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

class AssignmentMemberSpec {
        /**
         * The instance ID if provided.
         */
        Optional<String>        Optional<String> instanceId;

        /**
         * The rack ID if provided.
         */
        Optional<String>       Optional<String> rackId;

         /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid>        List<Uuid> subscribedTopicIds;

        /**
                  * The current target partitions owned byof the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
            List<TopicIdPartition> targetPartitions;
    }

     class TopicMetadataAssignmentTopicMetadata {
      	  /**
		         * The topic IDname.
		         */
		Uuid topicId        String topicName;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    } 

      classclass AssignmentGroupAssignment {
        /**
                  * The member assignmentassignments keyed by member id.
         */
        List<MemberAssignment>        Map<String, MemberAssignment> members;
      }

    class    class MemberAssignment {
        /**
         * The target partitions assigned to this member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;
            List<TopicIdPartition> targetPartitions;
    }

    /**
     * Unique name for this assignor.
     */
    String    String name();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param groupassignmentSpec The groupassignment statespec.
     * @return The new assignment for the group.
     */
    Assignment    Assignment assign(GroupAssignmentSpec groupassignmentSpec) throws PartitionAssignorException;
}

Broker Metrics

The set of new metrics is not clear at the moment. We plan to amend the KIP later on when progress on the implementation would have been made.

...

NameTypeDefaultDoc
group.consumer.session.timeout.msint45sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.

Consumer API

New PartitionAssignor interface

The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:

New PartitionAssignor interface

The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;
  
public interface PartitionAssignor {

    class Metadata {
        /**
         * The metadata version.
         */
        int version;

        /**
         * The metadata bytes.
         */
        ByteBuffer bytes;
    }

    class AssignmentSpec
Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

public interface PartitionAssignor {

    class Group {
        /**
                  * The members keyed by member id.
         */
        List<GroupMember>        Map<String, AssignmentMemberSpec> members;

        /**
                  * The topics' metadata. keyed by topic id
         */
        List<TopicMetadata>        Map<Uuid, AssignmentTopicMetadata> topics;
        }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

class AssignmentMemberSpec {
        /**
         * The instance ID if provided.
         */
        Optional<String>        Optional<String> instanceId;

        /**
         * The rack ID if provided.
         */
        Optional<String>
        Optional<String> rackId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid>        List<Uuid> subscribedTopicIds;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encodedreported inby {{@link GroupMember#metadata()}}the member.
		 */
		int version        Metadata metadata;

        /**
                  * The customcurrent metadatatarget providedpartitions byof the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
            List<TopicIdPartition> targetPartitions;
    }

    class TopicMetadataAssignmentTopicMetadata {
        	/**
		         * The topic IDname.
		         */
		Uuid topicId        String topicName;

        /**
		 * The number of partitions.
		 */
		int numPartitions; 
    }

    class Assignment    class GroupAssignment {
        /**
         * The assignment error.
         */
		byte error;

        /**
                  * The member assignment assignments keyed by member id.
         */
        List<MemberAssignment>        Map<String, MemberAssignment> members;
      }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

    class MemberAssignment {
        /**
         * The target partitions assigned to this partitionsmember.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 */
		byte error; List<TopicIdPartition> targetPartitions;

 		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		intMetadata versionmetadata;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

	class MetadataAssignorMetadata {
   		/**
		 * The reason reported by the assignor.
		 */
		byte reason; 

 		/**
		 * The version of the metadata encodedreported inby {{@link Metadata#metadata()}}the assignor.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata; Metadata metadata;
    }

    /**
     * Unique name for this assignor.
     */
    String    String name();

    /**
     * The minimum version.
     */
    int    int minimumVersion();

    /**
     * The maximum version.
     */
    int    int maximumVersion();

    /**
     * Return serializedassignor datametadata that will be sent to the assignor.
     */
    Metadata    AssignorMetadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param groupassignmentSpec The groupassignment statespec.
     * @return The new assignment for the group.
     */
    Assignment    Assignment assign(GroupAssignmentSpec groupassignmentSpec);

    /**
     * Callback which is invoked when the member received a new assignment 
     * assignment from the assignor/group coordinator. This is called once per epoch
     * and contains the target partitions for this members. This means that
     * partitions may not be assigned to the member yet. The rebalance
     * listener must be used to know this.
     * 
     * @param byte The error reported by the assignor.
     * @param assignment The assignment computed by the assignor.
     * @param consumerGroupMetadata The group metadata.
     */
    void    void onAssignment(byte error, MemberAssignment assignment, ConsumerGroupMetadata consumerGroupMetadata);
}

New SubscriptionPattern class

...