Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "uuidstring", "versions": "0+",
      "about": "The member id generated by the server. The member id must be kept during the entire lifetime of the clientmember." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null it not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
    { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The name of the assignor." },
        { "name": "MinimumVersion", "type": "int32", "versions": "0+",
          "about": "The minimum supported version for the metadata." },
        { "name": "MaximumVersion", "type": "int32", "versions": "0+",
          "about": "The maximum supported version for the metadata." },
        { "name": "Version", "type": "int32", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Reason", "type": "byte", "versions": "0+",
          "about": "The reason of the metadata update." }, 
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The metadata." }
      ]},
    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the topics owned by the member.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
      ]}
  ]
}

...

  • GroupId must be non-empty.
  • MemberId must be non-zeroempty.
  • MemberEpoch must be >= -1.
  • InstanceId, if provided, must be non-empty.
  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.
  • SubscribedTopicNames and SubscribedTopicRegex cannot be used together.
  • SubscribedTopicNames or SubscribedTopicRegex must be in the first heartbeat request.
  • SubscribedTopicRegex must be a valid regular expression.
  • ServerAssignor and ClientAssignors cannot be used together.
  • Assignor.Name must be non-empty.
  • Assignor.MinimumVersion must be >= 0-1.
  • Assignor.MaximumVersion must be >= 0 and >= Assignor.MinimumVersion.
  • Assignor.Version must be in the >= Assignor.MinimumVersion and <= Assignor.MaximumVersion.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupPrepareAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "uuidstring", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." }
  ]
}

...

  • GroupId must be non-empty.
  • MemberId must be non-zeroempty.
  • MemberEpoch must be >= 0.

...

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.
  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Returns the group state of the group.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields" // Supported errors: [
  // - GROUP_AUTHORIZATION_FAILED
  { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "ClientAssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "uuidstring", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "AssignorVersion", "type": "int32", "versions": "0+",
        "about": "The version of the metadata." },
      { "name": "AssignorReason", "type": "byte", "versions": "0+",
        "about": "The reason of the metadata update." }, 
      { "name": "AssignorMetadata", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata." },
      { "name": "TopicPartitions", "type": "[]TopicPartition",
        "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "PartitionMetadata", "type": "[]Metadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]}    
  ]
}

Response Handling

Possible errors:

...

If the response contains no error, the member calls the client side assignor with the group state.

Upon received the UNKNOWN_MEMBER_ID error, the consumer

...

abandon the process.

Upon received the FENCED_MEMBER_EPOCH

...

error, the consumer retries when receiving its next heartbeat response with its member epoch.

...

ConsumerGroupInstallAssignment API

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupInstallAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "uuidstring", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignmentDelaySecondsError", "type": "int32byte", "versions": "0+",
      "about": "AThe delayassignment inerror; secsor indicatingzero whenif the nextassignment rebalance must be run by the coordinatoris successful." },
     { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "uuidstring", "versions": "0+",
        "about": "The member ID." },
      { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
        ]},
      { "name": "MetadataVersion", "type": "int32", "versions": "0+",
        "about": "The metadata version." }
      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The metadata bytes." }
    ]}
  ]
}

Required ACL

  • Read Group

Request Validation

Request Handling

Response Schema

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.
  • MemberId must be non-empty.
  • MemberEpoch must be >= 0.

Request Handling

When the group coordinator handle a ConsumerGroupInstallAssignmentRequest request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.
  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.
  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.
  6. Installs the new target assignment.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "
Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupInstallAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields"// Supported errors: [
  //   - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH  
  // - INVALID_ASSIGNMENT  
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

Response Handling

Possible errors:

...

If the response contains no error, the member is done.

Upon received the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

Upon received the any other errors, the consumer

...

abandon the process.

ConsumerGroupDescribe API

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupDescribe",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [",
  "fields": [
    { "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
      "about": "The names of the groups to describe" },
    { "name": "GroupsIncludeAuthorizedOperations", "type": "[]stringbool", "versions": "0+", "entityType": "groupId",
      "about": "Whether to include authorized operations."The names}
 of the groups to describe" },
    { ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupIds must be non-empty.

Request Handling

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  • Checks wether the group ids exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  • Looks up the groups and returns the response.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": 71,
  "type": "response",
  "name": "IncludeAuthorizedOperationsConsumerGroupDescribeResponse",
  "typevalidVersions": "bool0",
  "versionsflexibleVersions": "0+",
  // Supported errors:  "about": "Whether to include authorized operations." }
  ]
}

Required ACL

Request Validation

Request Handling

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": 71,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members.",
          "fields": [
          { "name": "MemberId", "type": "uuid", "versions": "0+",
            "about": "The member ID." },
          { "name": "InstanceId", "type": "string", "versions": "0+",
            "about": "The member instance ID." },
    	  { "name": "MemberEpoch", "type": "int32", "versions": "0+",
            "about": "The current member epoch." },
          { "name": "ClientId", "type": "string", "versions": "0+",
            "about": "The client ID." },
          { "name": "ClientHost", "type": "string", "versions": "0+",
            "about": "The client host." },
          { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
            "about": "The subscribed topic IDs." },
          { "name": "Assignment", "type": "Assignment", "versions": "0+",
            "about": "The current assignment.",
            "fields": [
            { "name": "AssignmentPartitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "AssignmentMetadataVersion", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "AssignmentMetadataBytes", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]},
          { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
            "about": "The target assignment.",
            "fields": [
            { "name": "AssignmentPartitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
              { "name": "TopicId", "type": "uuid", "versions": "0+",
                "about": "The topic ID." }
              { "name": "Partitions", "type": "[]int32", "versions": "0+",
                "about": "The partitions." }
            ]},
            { "name": "AssignmentMetadataVersion", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "AssignmentMetadataBytes", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]}
      ]},
      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this group." }
    ]}
  ]
}

Response Handling

Possible errors:

  • GROUP_AUTHORIZATION_FAILED - The group operation is not authorized.
  • NOT_COORDINATOR - The coordinator which has received the request is not the correct coordinator.
  • COORDINATOR_NOT_AVAILABLE - The coordinator is not available.
  • COORDINATOR_LOAD_IN_PROGRESS - The coordinator is loading its state.
  • INVALID_REQUEST - The request is invalid.
  • INVALID_GROUP_ID - The group ID is invalid.
  • GROUP_ID_NOT_FOUND - The group ID does not exist.

Nothing particular.

ListGroups API

The existing ListGroups API will be extended to support the notion of group types and to support the new group states.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty
                all groups are returned with their state."
    },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty
                all groups are returned"
    }
  ]
}


Required ACL:

  • Read Group

Response

...