Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Eager Mode - In the eager mode, the consumer revokes all its partitions before re-joining the group during a rebalance.
  • Cooperative Mode - In the cooperative mode, the consumer only revokes the partitions that it does not own anymore before rejoining the group. So two rebalances are required to move a partition from member A to member B. One rebalance to revoke the partition from A and another rebalance to assign the partition to B.

The two modes must be treated differently. At the moment, the group coordinator does not know which mode is used by the group. This information will be added to the protocol.

Note that the upgrade path will only work from the consumer protocol version 3 (as described in KIP-792).

...

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id generated by the server. The member id must be kept during the entire lifetime of the member." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null it not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
    { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The name of the assignor." },
        { "name": "MinimumVersion", "type": "int32", "versions": "0+",
          "about": "The minimum supported version for the metadata." },
        { "name": "MaximumVersion", "type": "int32", "versions": "0+",
          "about": "The maximum supported version for the metadata." },
        { "name": "Version", "type": "int32", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Reason", "type": "byteint8", "versions": "0+",
          "about": "The reason of the metadata update." }, 
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The metadata." }
      ]},
    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the topics owned by the member.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
      ]}
  ]
}

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise."
      "fields": [
	    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned topic-partitions to the member otherwise.",
          "fields": [
        	{ "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic ID." },
	        { "name": "Partitions", "type": "[]int32", "versions": "0+",
              "about": "The partitions." }
      	]},
    	{ "name": "Error", "type": "byteint8", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned error." } 
	    { "name": "Metadata", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned metadata." }
	]
  ]
}

...

Upon receiving the COMPUTE_ASSIGNMENT error, the consumer starts the assignment process.

Upon received receiving the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and rejoins with the same member id and the epoch 0.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors: 
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "ClientAssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "AssignorVersion", "type": "int32", "versions": "0+",
        "about": "The version of the metadata." },
      { "name": "AssignorReason", "type": "byteint8", "versions": "0+",
        "about": "The reason of the metadata update." }, 
      { "name": "AssignorMetadata", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata." },
      { "name": "TopicPartitions", "type": "[]TopicPartition",
        "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "PartitionMetadata", "type": "[]Metadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]}    
  ]
}

...

If the response contains no error, the member calls the client side assignor with the group state.

Upon received receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

Upon received receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupInstallAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "Error", "type": "byteint8", "versions": "0+",
      "about": "The assignment error; or zero if the assignment is successful." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
        ]},
      { "name": "MetadataVersion", "type": "int32", "versions": "0+",
        "about": "The metadata version." }
      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
        "about": "The metadata bytes." }
    ]}
  ]
}

...

If the response contains no error, the member is done.

Upon received receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

Upon received the receiving any other errors, the consumer abandon the process.

...

Code Block
linenumberstrue
{
  "apiKey": 16,
  "type": "response",
  "name": "ListGroupsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out
  // responses before throttling.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the GroupState field (KIP-518).
  //
  // Version 5 adds the GroupType field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
      "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
      "about": "Each group in the response.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+",
        "entityType": "groupId",
        "about": "The group ID." },
      { "name": "ProtocolType", "type": "string", "versions": "0+",
        "about": "The group protocol type." },
      { "name": "GroupState", "type": "string", "versions": "4+", "ignorable": true,
        "about": "The group state name." },
      { "name": "GroupType", "type": "string", "versions": "5+", "ignorable": true,
        "about": "The group state name." }
    ]}
  ]
}

...

OffsetCommit API

The version of the API is bumped to 9.

Request Schema

...

languagejs
linenumberstrue

Required ACL

Request Validation

Request Handling

...

We propose to rename GenerationId to GenerationIdOrMemberEpoch.

Request Handling

When the group id corresponds to a consumer group using the new rebalance protocol, the provided member epoch must match the expected member epoch.

Response Schema

The response can return FENCED_MEMBER_EPOCH.

OffsetFetch API

The version of the API is bumped to 9.

Request Schema

Code Block
languagejs
linenumberstrue

Response Handling

OffsetCommit API

Update MemberId to UUID, add MemberEpoch

Request Schema

...

languagejs
linenumberstrue
{
  "apiKey": 9,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "OffsetFetchRequest",
  // In version 0, the request read offsets from ZK.
  //
  // Starting in version 1, the broker supports fetching offsets from the internal __consumer_offsets topic.
  //
  // Starting in version 2, the request can contain a null topics array to indicate that offsets
  // for all topics should be fetched. It also returns a top level error code
  // for group or coordinator level errors.
  //
  // Version 3, 4, and 5 are the same as version 2.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is adding the require stable flag.
  //
  // Version 8 is adding support for fetching offsets for multiple groups at a time
  //
  // Version 9 adds GenerationIdOrMemberEpoch and MemberId fields (KIP-848).
  "validVersions": "0-9",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0-7", "entityType": "groupId",
      "about": "The group to fetch offsets for." },
    // New fields.
    { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "9+", "default": "-1", "ignorable": true,
      "about": "The generation of the group." },
    { "name": "MemberId", "type": "string", "versions": "9+", "ignorable": true,
      "about": "The member ID assigned by the group coordinator." },
    // End of new fields.
    { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0-7", "nullableVersions": "2-7",
      "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
      { "name": "Name", "type": "string", "versions": "0-7", "entityType": "topicName",
        "about": "The topic name."},
      { "name": "PartitionIndexes", "type": "[]int32", "versions": "0-7",
        "about": "The partition indexes we would like to fetch offsets for." }
    ]},
    { "name": "Groups", "type": "[]OffsetFetchRequestGroup", "versions": "8+",
      "about": "Each group we would like to fetch offsets for", "fields": [
      { "name": "groupId", "type": "string", "versions": "8+", "entityType": "groupId",
        "about": "The group ID."},
      { "name": "Topics", "type": "[]OffsetFetchRequestTopics", "versions": "8+", "nullableVersions": "8+",
        "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
        { "name": "Name", "type": "string", "versions": "8+", "entityType": "topicName",
          "about": "The topic name."},
        { "name": "PartitionIndexes", "type": "[]int32", "versions": "8+",
          "about": "The partition indexes we would like to fetch offsets for." }
      ]}
    ]},
    {"name": "RequireStable", "type": "bool", "versions": "7+", "default": "false",
      "about": "Whether broker should hold on returning unstable offsets but set a retriable error code for the partitions."}
  ]
}

Request Handling

The MemberId and the GenerationIdOrMemberEpoch are verified. FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID is returned.

Response Schema

The response is the same. Only new FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID errors could be returned.

Response Handling

Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

DescribeConfigs API

The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id. 

AlterIncrementalConfigs API

The API is the same but supports a new resource type: GROUP (16). When GROUP is used, the resource name corresponds to the group id.

Records

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

As explained earlier, they will be persisted in the __consumer_offsets compacted topic. The compacted topic based storage requires a dedicated key type per record type in order for the compaction to work. The current protocol already uses versions from 0 to 2 (included) for the keys.

Group Metadata

Groups can be rather large so we propose to use several records to store a group in order to not be limited by the maximum batch size (1MB by default). Therefore we propose to store group metadata with two records types: the ConsumerGroupMetadata and the ConsumerGroupMemberMetadata.

A group with X members will be stored with X+2 records. One ConsumerGroupMemberMetadata per member, one ConsumerGroupPartitionMetadata, and one ConsumerGroupMetadata for the group at the end. Atomicity is not a concern here. All the records can be applied independently.

Moreover, the whole group does not necessarily have to be written for every epoch. Members who have not changed could be omitted as the compacted topic will retain their previous state anyway.

When a member is deleted, a tombstone for him is written to the partition.

ConsumerGroupMetadataKey


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataKey",
    "validVersions": "3",
    "flexibleVersions": "none",
    "fields": [
      { "name": "GroupId", "type": "string", "versions": "3" }
    ]
}


ConsumerGroupMetadataValue


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}


ConsumerGroupPartitionMetadataKey

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataKey",
    "validVersions": "4",
    "flexibleVersions": "none",
    "fields": [
      { "name": "GroupId", "type": "string", "versions": "4" }
    ]
}

ConsumerGroupPartitionMetadataValue

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupPartitionMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitionMetadata", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "NumPartitions", "versions": "0+", "type": "int32" }
          ]}
    ], 
}

ConsumerGroupMemberMetadataKey


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataKey",
    "validVersions": "5",
    "flexibleVersions": "none",
    "fields": [
        { "name": "GroupId", "type": "string", "versions": "5" },
        { "name": "MemberId", "type": "string", "versions": "5" }
    ]
}


ConsumerGroupMemberMetadataValue


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int32" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int32" },
            { "name": "Version", "versions": "0+", "type": "int32" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}


Target Assignment

The target assignment is stored in a single record.

ConsumerGroupTargetAssignmentKey


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentKey",
    "validVersions": "6",
    "flexibleVersions": "none",
    "fields": [
      { "name": "GroupId", "type": "string", "versions": "5" }
    ]
}


ConsumerGroupTargetAssignmentValue


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "AssignorName", "versions": "0+", "type": "string" },
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
        	{ "name": "MemberId", "versions": "0+", "type": "string" },
        	{ "name": "TopicPartitions", "versions": "0+",
          	  "type": "[]TopicPartition", "fields": [
            	{ "name": "TopicId", "versions": "0+", "type": "uuid" },
            	{ "name": "Partitions", "versions": "0+", "type": "[]int32" }
        	]},
        	{ "name": "MetadataError", "versions": "0+", "type": "int8" },
        	{ "name": "MetadataVersion", "versions": "0+", "type": "int32" },
        	{ "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
        ]
    ]
}


ConsumerGroupTargetMemberAssignmentKey


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetMemberAssignmentKey",
    "validVersions": "6",
    "flexibleVersions": "none",
    "fields": [
      { "name": "GroupId", "type": "string", "versions": "6" },
      { "name": "MemberId", "type": "string", "versions": "6" },
    ]
}


ConsumerGroupTargetMemberAssignmentValue


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupTargetMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "MemberId", "versions": "0+", "type": "string" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "MetadataError", "versions": "0+", "type": "int8" },
        { "name": "MetadataVersion", "versions": "0+", "type": "int32" },
        { "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
    ], 
}


Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for him is written to the partition.

ConsumerGroupCurrentMemberAssignmentKey


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentKey",
    "validVersions": "7",
    "flexibleVersions": "none",
    "fields": [
      { "name": "GroupId", "type": "string", "versions": "7" },
      { "name": "MemberId", "type": "string", "versions": "7" },
    ]
}


ConsumerGroupCurrentMemberAssignmentValue


Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "MetadataError", "versions": "0+", "type": "int8" },
        { "name": "MetadataVersion", "versions": "0+", "type": "int32" },
        { "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
    ], 
}


Broker API

Broker side assignor API - Basically the same as the consumer but without metadata.

TODO

Broker Metrics

  • Group count by type
  • Group count by state
  • Rebalance Rate

Broker Configurations

New properties in the broker configuration.

NameTypeDefaultDoc
group.consumer.session.timeout.msint30sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.min.session.timeout.msint45sThe minimum session timeout.
group.consumer.max.session.timeout.msint60sThe maximum session timeout.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.
group.consumer.min.heartbeat.interval.msint5sThe minimum heartbeat interval.
group.consumer.max.heartbeat.interval.msint15sThe maximum heartbeat interval.
group.consumer.max.sizeintMaxValueThe maximum number of consumers that a single consumer group can accommodate.
group.consumer.assignorsListrange, uniformThe server side assignors.

Group Configurations

New dynamic group properties.

NameTypeDefaultDoc
group.consumer.session.timeout.msint30sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.

Consumer API

New PartitionAssignor interface

The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:


Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * The mapping from topic ID to number of partitions
         * as provided by the group coordinator
         */
        Map<Uuid, Integer> topicMetadata;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

    class Assignment {
        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 */
		byte error; 

 		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;

    }

	class Metadata {
   		/**
		 * The reason reported by the assignor.
		 */
		byte reason; 

 		/**
		 * The version of the metadata encoded in {{@link Metadata#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata; 
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);
}


Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)

The enforceRebalance methods are no longer necessary and will be deprecated in a future release.

Deprecate ConsumerPartitionAssignor interface.

The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.

Consumer Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

group.remote.assignorstringuniformThe server side assignor to use. It cannot be used in conjunction with group.local.assignor.
group.local.assignorslistemptyThe list of client side (local) assignors. It cannot be used in conjunction with group.remote.assignor.

Streams Metadata

TODO

Streams Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

Admin API

Admin#listConsumerGroups

The Admin#listConsumerGroups will be extended to support querying group types and retrieving/querying the new group states.


Code Block
languagejs
linenumberstrue
public class ListConsumerGroupsOptions extends AbstractOptions<ListConsumerGroupsOptions> {

    /**
     * If types is set, only groups with these types will be returned.
     */
    public ListConsumerGroupsOptions withTypes(Set<String> types) {
        this.types = types;
    }

    /**
     * Returns the list of Types that are requested or empty if no types
     * have been specified.
     */
    public Set<String> types() {
        return types;
    }
}

public class ConsumerGroupListing {

    /**
     * Consumer Group type, generic by default.
     */
    public String type() {
        return type;
    }
}

public enum ConsumerGroupState {
    UNKNOWN("Unknown"),
    PREPARING_REBALANCE("PreparingRebalance"),
    COMPLETING_REBALANCE("CompletingRebalance"),
    STABLE("Stable"),
    DEAD("Dead"),
    EMPTY("Empty"),
    ASSIGNING("Assigning"),
    RECONCILING("Reconciling");
}


Admin#describeConsumerGroups

The Admin#describeConsumerGroups will be extended to expose the new information related to the new protocol.


Code Block
languagejava
linenumberstrue
public class ConsumerGroupDescription {
    public String type() {
      return type;
    }
}

public class MemberDescription {
    // Current Assignment
    public MemberAssignment assignment() {}

    // Target Assignment
    public MemberAssignment targetAssignment() {}
}

public class MemberAssignment {
	/**
	 * The reason reported by the assignor.
	 */
	byte error; 

	/**
	 * The version of the metadata encoded in {{@link Metadata#metadata()}}.
	 */
	int version;

	/**
	 * The custom metadata provided by the assignor.
	 */
	ByteBuffer metadata;

	/**
     * The partitions owned by the member at the current epoch.
     */
    List<TopicIdPartition> ownedPartitions;
}


Admin#incrementalAlterConfigs and Admin#describeConfigs

The GROUP resource type is added.

Code Block
languagejava
linenumberstrue
public final class ConfigResource {
    /**
     * Type of resource.
     */
    public enum Type {
         GROUP((byte) 16), BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0);
    }
}

kafka-consumer-groups

The kafka-consumer-group command line tool will be extended to support the –type filter which allows to list or to describe groups implementing a specific type.


Code Block
languagebash
linenumberstrue
kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–list -–type <comma separated list of types>

kafka-consumer-groups.sh -–bootstrap-server localhost:9092 -–describe -–type <comma separated list of types>

Required ACL

Request Validation

Request Handling

Response Schema

...

languagejs
linenumberstrue

Response Handling

OffsetFetch API

Add MemberId/MemberEpoch

Request Schema

...

languagejs
linenumberstrue

Required ACL

Request Validation

Request Handling

Response Schema

...

languagejs
linenumberstrue

Response Handling

DescribeConfigs API

Add group resource.

Request Schema

...

languagejs
linenumberstrue

Required ACL

Request Validation

Request Handling

Response Schema

...

languagejs
linenumberstrue

Response Handling

AlterIncrementalConfigs API

Add group resource.

Request Schema

...

languagejs
linenumberstrue

Required ACL

Request Validation

Request Handling

Response Schema

...

languagejs
linenumberstrue

Response Handling

Records

TODO

Broker API

Broker side assignor API - Basically the same as the consumer but without metadata.

Broker Metrics

TODO

Broker Configurations

  • session timeout
  • heartbeat interval
  • min/max for both

Group Configurations

  • session timeout
  • heartbeat interval

Consumer API

Client side assignor

Consumer Configurations

  • flag
  • server side assignor
  • client side assignor

Streams Metadata

TODO

Streams Configurations

  • flag

Admin API

...

Case Studies

Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.

...