Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Using a feature flag will enable operators to enable the new consumer group protocol without having to roll the cluster twice. In other words, once the new software is deployed on all the nodes, the operator can update the feature flag to enable the feature. Until the feature is enabled, the new APIs won’t be advertised by the ApiVersions API.

MetadataVersion / IBP

TODO

Persistence

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise."
      "fields": [
	    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned topic-partitions to the member otherwise.",
          "fields": [
        	{ "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic ID." },
	        { "name": "Partitions", "type": "[]int32", "versions": "0+",
              "about": "The partitions." }
      	]},
    	{ "name": "Error", "type": "byte", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned error." } 
	    { "name": "Metadata", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned metadata." }
	]
  ]
}

...

  • GroupId must be non-empty.
  • MemberId must be non-zero.
  • MemberEpoch must be >= 0.

Request Handling

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.
  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.
  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "ClientAssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
    { "name": "MemberId", "type": "uuid", "versions": "0+",
      "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "AssignorVersion", "type": "int32", "versions": "0+",
        "about": "The version of the metadata." },
       { "name": "AssignorReason", "type": "byte", "versions": "0+",
        "about": "The reason of the metadata update." }, 
      { "name": "AssignorMetadata", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata." },
      { "name": "TopicPartitions", "type": "[]TopicPartition",
        "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "PartitionMetadata", "type": "[]Metadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]}    
  ]
}

...