Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Consumer Group
NameTypeDescription
Group IDstringThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to detect partition metadata changes.
Member
NameTypeDescription
Member IDstringThe unique identifier of the member. It is generated by the client once and must be used during its lifetime. The ID is similar to an incarnation ID.
Instance IDstringThe instance ID configured by the consumer.
Rack IDstringThe rack ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client host configured by the consumer.
Subscribed Topic Ids[]uuidThe current set of subscribed topic ids configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Reasonint8The reason why the metadata was updated.
Minimum Versionint16The minimum version of the metadata schema supported by this assignor.
Maximum Versionint16The maximum version of the metadata schema supported by this assignor.
Versionint16The version used to encode the metadata.
MetadatabytesThe metadata provided by the consumer for this assignor.

...

  • All partitions are assigned.
  • A partition is assigned only once.
  • All members existsmember ids are valid. They must correspond to members in the group.

Note that this validation is made with regarding to the metadata used to compute the assignment. The group may have already advanced to a newer group epoch - e.g. a member could have left during the assignment computation.

...

The group coordinator ensures that requests comes from a known Member ID. Any request is rejected with the UNKNOWN_MEMBER_ID error otherwise. It also ensures that the Member Epoch matches the expected member epoch. If not, the request is rejected with the FENCED_MEMBER_EPOCH error. In this case, the member is expected to immediately gives up all its partitions and rejoin the group with the same member ID and a member epoch equals to zero. Details for every API are given in the Public Interfaces section.

...

The new group coordinator will rely on the metadata.version or IBP to enable the new protocol. The exact version will be determined when the feature is ready.

Fail Over

When the Group Coordinator fails over, the newly elected coordinator will load the state from the __consumer_offsets partition(s). When it is done with this, it has a few other duties for the newly loaded groups:

  • It has to setup the session timeouts for all the members (like today).
  • It has to check wether the topic-partition metadata has changed and potentially trigger a rebalance for the group if it has.
  • It has to check wether new topics match the regex subscriptions and trigger a rebalance for the group if new topic do.
  • It has to check wether a new assignment is required for the group (group epoch != assignment epoch). If it is the case, the group coordinator can directly compute it using the server side assignor or can trigger a client side assignment computation.

Persistence

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

...

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id generated by the server. The member id must be kept during the entire lifetime of the member." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
    { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
      "about": "null itif not provided or if it didn't change since the last heartbeat; the instance Idrack ID of consumer otherwise." },
     { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
    { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The name of the assignor." },
        { "name": "MinimumVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported version for the metadata." },
        { "name": "MaximumVersion", "type": "int16", "versions": "0+",
          "about": "The maximum supported version for the metadata." },
        { "name": "Reason", "type": "int8", "versions": "0+",
          "about": "The reason of the metadata update." }, 
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The metadata." }
      ]},
    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the partitions owned by the member.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
      ]}
  ]
}

...

The group coordinator will only set the Assignment field when until the member epoch is smaller than the target assignment epochacknowledges that it has converged to the desired assignment. This is done to ensure that the members converge to the target assignment.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - STALE_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
        "about": "The member instance ID." },
      { "name": "RackId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", 
        "about": "The member instance ID." },
      { "name": "SubscribedTopicIds", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "Assignor", "type": "Assignor", "versions": "0+",
        "about": "The information of the selected assignor",
        "fields": [ 
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Reason", "type": "int8", "versions": "0+",
          "about": "The reason of the metadata update." }, 
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The assignor metadata." }
      ]},
      { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The target topic-partitions of the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
          "about": "The number of partitions." }
    ]}    
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": 71,
  "type": "response",
  "name": "ConsumerGroupDescribeResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "Groups", "type": "[]DescribedGroup", "versions": "0+",
      "about": "Each described group.",
      "fields": [
        { "name": "ErrorCode", "type": "int16", "versions": "0+",
          "about": "The describe error, or 0 if there was no error." },
        { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
          "about": "The group ID string." },
        { "name": "GroupState", "type": "string", "versions": "0+",
          "about": "The group state string, or the empty string." },
        { "name": "GroupEpoch", "type": "int32", "versions": "0+",
          "about": "The group epoch." },
        { "name": "AssignmentEpoch", "type": "int32", "versions": "0+",
          "about": "The assignment epoch." },
        { "name": "AssignorName", "type": "string", "versions": "0+",
          "about": "The selected assignor." },
        { "name": "Members", "type": "[]Member", "versions": "0+",
          "about": "The members. members.",
          "fields": [
          { "name": "MemberId", "type": "uuid", "versions": "0+",
            "fieldsabout": [ "The member ID." },
          { "name": "MemberIdInstanceId", "type": "uuidstring", "versions": "0+",
  "nullableVersions": "0+", "default": "null", 
            "about": "The member instance ID." },
          { "name": "InstanceIdRackId", "type": "string", "versions": "0+",
  "nullableVersions": "0+", "default": "null", 
            "about": "The member instancerack ID." },
     	     { "name": "MemberEpoch", "type": "int32", "versions": "0+",
            "about": "The current member epoch." },
          { "name": "ClientId", "type": "string", "versions": "0+",
            "about": "The client ID." },
          { "name": "ClientHost", "type": "string", "versions": "0+",
            "about": "The client host." },
          { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
            "about": "The subscribed topic IDs." },
          { "name": "Assignment", "type": "Assignment", "versions": "0+",
            "about": "The current assignment.",
            "fields": [
            { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "Version", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "Metadata", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]},
          { "name": "TargetAssignment", "type": "Assignment", "versions": "0+",
            "about": "The target assignment.",
            "fields": [
            { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
              "about": "The assigned topic-partitions to the member.",
              "fields": [
                { "name": "TopicId", "type": "uuid", "versions": "0+",
                  "about": "The topic ID." },
                { "name": "Partitions", "type": "[]int32", "versions": "0+",
                  "about": "The partitions." }
              ]},
            { "name": "Version", "type": "int32", "versions": "0+",
              "about": "The assignor metadata version." }
            { "name": "Metadata", "type": "bytes", "versions": "0+",
              "about": "The assignor metadata bytes." }
          ]},
      { "name": "AuthorizedOperations", "type": "int32", "versions": "3+", "default": "-2147483648",
        "about": "32-bit bitfield to represent authorized operations for this group." }
    ]}
  ]
}

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}

Note that the Epoch is always the latest epoch of the group.

ConsumerGroupPartitionMetadataKey

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadata", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "NumPartitions", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

Note that the Epoch is always the latest epoch of the group.

ConsumerGroupMemberMetadataKey

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "type": "string" },
        { "name": "RackId", "versions": "0+", "type": "string" },
                { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
			{ "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}

Note that the GroupEpoch is always the latest epoch of the group.

Target Assignment

The target assignment is stored in a single record.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentKey",
    "validVersions": "6",
    "flexibleVersions": "none",
    "fields": [
      	{ "name": "GroupId", "type": "string", "versions": "56" }
    ]
}

ConsumerGroupTargetAssignmentValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
        	{ "name": "MemberId", "versions": "0+", "type": "string" },
            { "name": "Error", "versions": "0+", "type": "int8" },
            { "name": "TopicPartitions", "versions": "0+",
          	  "type": "[]TopicPartition", "fields": [
            	{ "name": "TopicId", "versions": "0+", "type": "uuid" },
            	{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
        	]},
        	{ "name": "Version", "versions": "0+", "type": "int16" },
        	{ "name": "Metadata", "versions": "0+", "type": "bytes" }
        ]
    ]
}

The AssignmentEpoch corresponds to the group epoch used to compute the assignment. It is not necessarily the last one.

Current Member Assignment

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },
		{ "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "Version", "versions": "0+", "type": "int16" },
        { "name": "Metadata", "versions": "0+", "type": "bytes" }
    ], 
}

the group coordinator will ensureOffsetCommitValue

Code Block
languagejs
linenumberstrue
{
  "type": "data",
  "name": "OffsetCommitValue",
  "validVersions": "0-4",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "offset", "type": "int64", "versions": "0+" },
    { "name": "leaderEpoch", "type": "int32", "versions": "3+", "default": -1, "ignorable": true },
    { "name": "metadata", "type": "string", "versions": "0+" },
    { "name": "commitTimestamp", "type": "int64", "versions": "0+" },
    { "name": "expireTimestamp", "type": "int64", "versions": "1", "default": -1, "ignorable": true },
    // Adds TopicId field.
    { "name": "topicId", "type": "uuid", "versions": "4", "ignorable": true }
  ]
}

...