Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • The group coordinator selects a member to run the assignment logic. The selection is explained later in this chapter.
  • The group coordinator notifies the member to compute the new assignment by returning the COMPUTE_ASSIGNMENT error setting the ShouldComputeAssignment field in its next heartbeat response.
  • When the member receives this error, it is expected to call the ConsumerGroupPrepareAssignment API to get the current group metadata and the current target assignment.
  • The member computes the new assignment with the relevant assignor.
  • The member calls the ConsumerGroupInstallAssignment API to install the new assignment. The group coordinator validates it and persists it.

...

Consumer#enforceRebalance will be deprecated and will throw an IllegalStateException be a no-op if used when the new protocol is enabledenable. A warning will be logged in this case. Enforcing a rebalance with the new protocol does not make any sense. Instead, power users will have the ability to trigger a reassignment by either providing a non-zero reason or by updating the assignor metadata.

...

  • FENCED_MEMBER_EPOCH - The member epoch is fenced by the coordinator. The member must abandon all its partitions and rejoins.
  • STALE_MEMBER_EPOCH - The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.COMPUTE_ASSIGNMENT - The member has been selected by the coordinator to compute the new target assignment of the group.
  • UNSUPPORTED_ASSIGNOR - The assignor used by the member or its version range are not supported by the group.

...

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
      "about": "True if the member should compute the assignment for the group." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise."
      "fields": [
     	{ "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",
          "about": "The assigned topic-partitions to the member.",
          "fields": [
        	{ "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic ID." },
	        { "name": "Partitions", "type": "[]int32", "versions": "0+",
              "about": "The partitions." }
      	]},
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The assigned metadata." }
	]
  ]
}

...

  1. It updates its member epoch.
  2. It computes the difference between the old and the new assignment to determine the revoked partitions and the newly assignment partitions. There should be either revoked partitions or newly assignment partitions. The protocol never does both together.
    1. It revokes the partitions, commit all the offsets, and calls ConsumerRebalanceListener#onPartitionsRevoked.
    2. It assigns the new partitions, calls PartitionAssignor#onAssignment if one is defined and calls ConsumerRebalanceListener#onPartitionsAssigned.
  3. After a revocation, It sends the next heartbeat immediately to acknowledge it. 

Upon receiving the COMPUTE_ASSIGNMENT errorIf the response has ShouldComputeAssignment field set to true, the consumer starts the assignment process.

...