Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

While there are similarities in Kafka Streams and Connect in terms of how they can be onboarded onto the new rebalance protocol. the rebalance protocol would need to be updated to be able to understand Kafka Connect semantics. This is because the new protocol is tailored around Topics/Partitions while Kafka Connect doesn’t deal with those. Here are some of the updates :

...

The new protocol introduces a concept of types  within the group coordinator. This is basically to allow supporting different kinds of groups in the future and also to be able to differentiate between old and new consumer groups. Keeping the former reason in mind, we would introduce a new type called connect .

Data Model

The KIP introduces a logical data model to capture group membership and their assignments which the group coordinator would use for bookkeeping purposes. There are separate public interfaces which are defined to capture actual requests/responses between brokers <=> group-coordinator. This is again consumer group centric which we would try to leverage and expand for Kafka Connect.

...

The new protocol introduces a new config called group.protocol . The older protocol is called generic  while the new one is called consumer . We can probably reuse consumer because the types config would be set to connectTo maintain clear separation, we would introduce a new possible value for group.protocol  called connect .

Worker Group and Worker

Worker Group

Name

Type

Description

Group ID

string

The group ID as configured on the worker.The ID uniquely identifies the worker cluster.

Group Epoch

int32

The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.

Workers

[]Worker

The set of worker in the group.

...

  1. A new worker joins.

  2. A worker updates it’s assignors(Do we need it for Connect?).

  3. A worker updates its assignors' reason or metadata.

  4. A worker is fenced or removed from the group by the group coordinator.

  5. Connectors are added or removed.

  6. Tasks are added/removed from Connectors(Couldn’t find it but think this would be applicable).

...

  1. The group coordinator revokes the connectors/tasks which are no longer in the target assignment of the member. It does so by providing the intersection of the Current connectors/tasks and the Target connectors/tasks in the heartbeat response until the worker acknowledges the revocation in the heartbeat response. We can repurpose the rebalance.timeout.ms config   config to put a cap on the rebalance process or else the worker would be kicked out of the group.

  2. When the group coordinator receives the acknowledgement of the revocation, it updates the worker current assignment to its target assignment (and target epoch) and durably persist it.

  3. The group coordinator assigns the new connectors/tasks to the worker. It does so by providing the Target connectors/tasks to the worker while ensuring that connectors/tasks which are not revoked by other workers yet are removed from this set. In other words, new connectors/tasks are incrementally assigned to the worker when they are revoked by the other workers.

...

  • The group coordinator selects a worker to run the assignment logic. We will get it to it later.

  • The group coordinator notifies the worker to compute the new assignment by returning the COMPUTE_ASSIGNMENT error in its next heartbeat response.

  • When the worker receives this error, it is expected to call the ConsumerGroupPrepareAssignment ConnectGroupPrepareAssignment API to get the current group metadata and the current target assignment.

  • The worker computes the new assignment with the relevant assignor.

  • The worker calls the ConsumerGroupInstallAssignment ConnectGroupInstallAssignment API to install the new assignment. The group coordinator validates it and persists it.

The worker should finish the assignment within rebalance.timeout.ms .

Worker Selection

The group coordinator can generally pick any workers to run the assignment. However, when the workers support different version ranges, the group coordinator must select a worker which is able to handle all the supported versions. For instance, if we have three workers: A [1-5], B [3-4], C [2-4]. Worker A must be selected because it supports all the other versions in the group.

...

The KIP talks about introducing a new interface for Client Side assignors called PartitionAssignor and called and deprecating the current ConsumerPartitionAssignor . However, Connect doesn’t use ConsumerPartitionAssignor and instead has it’s Since connect has it's own assignor called ConnectAssignor . To make it work with the new protocol, we will be enhancing the ConnectAssignor . I think this is a good opportunity to introduce a new Assignor interface and have it extended by 2 interfaces PartitionAssignor and ConnectAssignor. Reasons:

  1. So, far the 2 have remained totally isolated and it seemed to have worked fine but given that they both assign units of work, IMO they could have a similar lineage.

  2. This bodes well for the new protocol as there are a lot of common fields that the 2 will have.

  3. This would also take away the notion of assignors being only a topic-partition construct as connect also does the same albeit for different units. So, we can add newer assignors for newer systems if it gets to that in the future.

I would be added details of new Assingor interface and it’s 2 children interfaces PartitionAssignor and ConnectAssignor. On similar lines, we can rename ConsumerGroupHeartbeat and other related APIs to something like GroupHeatbeat to make it more inclusive of both consumer groups and connect.

Triggering Rebalances

The IncrementalCooperativeAssignor has a provision of triggering rebalances after a fixed scheduled.rebalance.max.delay.ms interval to allow any departed worker to come back so that it gets the same or similar assignments. Since the older rebalance protocol was triggered entirely through clients, this was possible while the new protocol offloads the rebalancing duties to the Group Coordinator. Having said that, the ClientSideAssignors can still trigger rebalances by setting the reason field which the Group Coordinator will need to understand. Here again, we will need to enhance the Group Coordinator logic based on group.type field for connect .

Migration To the New Protocol

we would be extending the same.

Triggering Rebalances

The IncrementalCooperativeAssignor  has a provision of triggering rebalances after a fixed scheduled.rebalance.max.delay.ms  interval to allow any departed worker to come back so that it gets the same or similar assignments. Since the older rebalance protocol was triggered entirely through clients, this was possible while the new protocol offloads the rebalancing duties to the Group Coordinator. Having said that, the ClientSideAssignors can still trigger rebalances by setting the reason  field which the Group Coordinator will need to understand. Here again, we will need to enhance the Group Coordinator logic based on group.type  field connect .

Migration To the New Protocol

Upgrading to the new protocol or downgrading from it is possible by rolling the workers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first worker that supports the new protocol joins the group, the group is Upgrading to the new protocol or downgrading from it is possible by rolling the consumers, assuming that the new protocol is enabled on the server side, with the correct group.protocol. When the first worker that supports the new protocol joins the group, the group is converted from generic to consumer. Similarly when the last worker supporting the new protocol leaves, the group switches back to the old protocol. Note that the group epoch starts at the current group generation.

...

  • FENCED_MEMBER_EPOCH - The member epoch does not correspond to the member epoch expected by the coordinator.

  • COMPUTE_ASSIGNMENT - The member has been selected by the coordinator to compute the new target assignment of the group.

  • UNSUPPORTED_ASSIGNOR - The assignor used by the member or its version range are not supported by the group.

...


ConnectGroupHeartbeat API

The GroupHeartbeat ConnectGroupHeartbeat API is the new core API used by workers to form a group. The API allows workers to advertise their state, assignors and their owned connectors/tasks. The group coordinator uses it to assign/revoke connectors/tasks to/from workers. This API is also used as a liveness check.

...

Code Block
languagejs
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "GroupHeartbeatRequestConnectGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id generated by the server. The member id must be kept during the entire lifetime of the member. For connect, this corresponds
      to workerIds" },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null it not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
    { "name": "SubscribedTopicNamesServerAssignor", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the server subscribedside assignor topicto namesuse otherwise." },
    { "name": "SubscribedTopicRegexClientAssignors", "type": "string[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the subscribed topic regexlist of client-side assignors otherwise." },
      "fields": [
        { "name": "ServerAssignorName", "type": "string", "versions": "0+",
          "nullableVersionsabout": "0+", "defaultThe name of the assignor." },
        { "name": "nullMinimumVersion",
 "type":     "about"int16", "versions": "null0+",
 if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise"about": "The minimum supported version for the metadata." },
        { "name": "ClientAssignorsMaximumVersion", "type": "[]Assignorint16", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "nullThe ifmaximum notsupported usedversion or if it didn't change since the last heartbeat; the list of client-side assignors otherwise."for the metadata." },
      "fields": [
        { "name": "NameReason", "type": "stringint8", "versions": "0+",
          "about": "The namereason of the assignormetadata update." },
        { "name": "MinimumVersionVersion", "type": "int16", "versions": "0+",
          "about": "The minimum supported version forof the metadata." },
        { "name": "MaximumVersionMetadata", "type": "int16bytes", "versions": "0+",
          "about": "The maximum supported version for the metadata." metadata." }
      ]},
        { "name": "ReasonConnectorsAndTasks", "type": "int8[]ConnectorsAndTasks", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "Thenull reasonif ofit thedidn't metadatachange update." },
        { "name": "Version", "type": "int16", "versions": "0+",
    since the last heartbeat; the connectors/tasks owned by the worker. This will be set only when group.type is equal 
      "about": "The version of the metadata." },to connect",
      "fields": [
        { "name": "Metadataconnectors", "type": "bytes[]String", "versions": "0+",
          "about": "The metadata Connectors assigned to this worker." },
      ]},
    { "name": "TopicPartitionstasks", "type": "[]TopicPartitionConnectorTaskID", "versions": "0+",
 "nullableVersions": "0+", "default": "null",
      "about": "nullThe iftasks itassigned didn'tto change since the last heartbeat; the partitions owned by the member.",this worker." }
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
      ]},
      { "name": "ConnectorsAndTasks", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the connectors/tasks owned by the worker. This will be set only when group.type is equal 
      to connect",
      "fields": [
        { "name": "connectors", "type": "[]String", "versions": "0+",
          "about": "The Connectors assigned to this worker." },
        { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+",
          "about": "The tasks assigned to this worker." }
      ]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= -1.

  • InstanceId, if provided, must be non-empty.

  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.

  • SubscribedTopicNames and SubscribedTopicRegex cannot be used together.

  • SubscribedTopicNames or SubscribedTopicRegex must be in the first heartbeat request.

  • SubscribedTopicRegex must be a valid regular expression.

  • ServerAssignor and ClientAssignors cannot be used together.

  • Assignor.Name must be non-empty.

  • Assignor.MinimumVersion must be >= -1.

  • Assignor.MaximumVersion must be >= 0 and >= Assignor.MinimumVersion.

  • Assignor.Version must be in the >= Assignor.MinimumVersion and <= Assignor.MaximumVersion.

  • TopicPartitions and ConnectorsAndTasks cannot be used together.

UNSUPPORTED_ASSIGNOR is returned should the request not obey to the following invariants:

  • ServerAssignor must be supported by the server.

  • ClientAssignors' version range must overlap with the other members in the group.

Request Handling

When the group coordinator handle a ConsumerGroupHeartbeat request:

  1. Lookups the group or creates it.

  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.

    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and his request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions or connectors/tasks owned by the members are a subset of the target assignments. This could be decided based on the tyoes. If it is the case, it is safe to transition the member to its target epoch again.

  4. Updates the members informations if any. The group epoch is incremented if there is any change.

  5. Reconcile the member assignments as explained earlier in this document. 

Response Schema 

The group coordinator will only set the Assignment field when the member epoch is smaller than the target assignment epoch. This is done to ensure that the members converge to the target assignment.

]}
  ]
}


Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= -1.

  • InstanceId, if provided, must be non-empty.

  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.

  • ServerAssignor and ClientAssignors cannot be used together.

  • Assignor.Name must be non-empty.

  • Assignor.MinimumVersion must be >= -1.

  • Assignor.MaximumVersion must be >= 0 and >= Assignor.MinimumVersion.

  • Assignor.Version must be in the >= Assignor.MinimumVersion and <= Assignor.MaximumVersion.

UNSUPPORTED_ASSIGNOR is returned should the request not obey to the following invariants:

  • ServerAssignor must be supported by the server.

  • ClientAssignors' version range must overlap with the other members in the group.

Request Handling

When the group coordinator handles a ConnectGroupHeartbeat request:

  1. Lookups the group or creates it.

  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.

    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and his request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions or connectors/tasks owned by the members are a subset of the target assignments. This could be decided based on the types. If it is the case, it is safe to transition the member to its target epoch again.

  4. Updates the members informations if any. The group epoch is incremented if there is any change.

  5. Reconcile the member assignments as explained earlier in this document. 

Response Schema 

The group coordinator will only set the Assignment field when the member epoch is smaller than the target assignment epoch. This is done to ensure that the members converge to the target assignment.

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConnectGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "GroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise."
      "fields": [
        { "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "TopicPartitionsThrottleTimeMs", "type": "[]TopicPartitionint32", "versions": "0+",
          "about": "The assignedduration topic-partitionsin tomilliseconds the member.",
          "fields": [
            { "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic IDfor which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
            { "name": "PartitionsErrorCode", "type": "[]int32int16", "versions": "0+",
              "about": "The partitions." }
        ]},
  top-level error code, or 0 if there was no error" },
      { "name": "ConnectorsAndTasksErrorMessage", "type": "[]ConnectorsAndTaskstring", "versions": "0+",
     "nullableVersions": "0+", "default": "null",
      "about": "The assigned connectors/tasks to the member.",
          "fields": [
        top-level error message, or null if there was no error." },
    { "name": "connectorsMemberEpoch", "type": "[]Stringint32", "versions": "0+",
      "about": "The Connectors assigned to this workermember epoch." },
            { "name": "tasksHeartbeatIntervalMs", "type": "[]ConnectorTaskIDint32", "versions": "0+",
      "about": "The tasksheartbeat assignedinterval toin this workermilliseconds." }
        ]},
        { "name": "VersionAssignment", "type": "int16Assignment", "versions": "0+",
     "nullableVersions": "0+", "default": "null",
      "about": "Thenull if versionnot ofprovided; the metadata." },assignment otherwise."
      "fields": [
        { "name": "MetadataError", "type": "bytesint8", "versions": "0+",
          "about": "The assigned metadata." }
,
        ]
  ]
}

Response Handling (Only for Connect)

If the response contains no error, the member will reconcile its current assignment towards its new assignment. It does the following:

  1. It updates its member epoch.

  2. It computes the difference between the old and the new assignment to determine the revoked connectors/tasks and the newly assigned ones. There should be either revoked connectors/tasks or newly assigned connectors/tasks The protocol never does both together.

    1. It revokes the connectors/tasks, release all resources, and calls WorkerRebalanceListener#onRevoked.

    2. It assigns the new connectors/tasks, calls ConnectAssignor#onAssignment and calls WorkerRebalanceListener#onAssigned.

  3. After a revocation, It sends the next heartbeat immediately to acknowledge it.

Upon receiving the COMPUTE_ASSIGNMENT error, the worker starts the assignment process.

Upon receiving the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the worker abandons all its resources and rejoins with the same member id and the epoch 0.

GroupPrepareAssignment API

The GroupPrepareAssignment API will be used by the member to get the information to feed its client-side assignor.

Request Schema

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "GroupPrepareAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
  "about": "The assigned error." },
        { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",
          "about": "The assigned connectors/tasks to the member.",
          "fields": [
            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },
            { "name": "GroupIdtasks", "type": "string[]ConnectorTaskID", "versions": "0+", "entityTypeabout": "groupId",The tasks assigned to this worker." }
      "about": "The group identifier."   ]},
        { "name": "MemberIdVersion", "type": "stringint16", "versions": "0+",
          "about": "The member id assigned byversion of the group coordinatormetadata." },
        { "name": "MemberEpochMetadata", "type": "int32bytes", "versions": "0+",
          "about": "The memberassigned epochmetadata." }
    ]
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= 0.

Request Handling

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Returns the group state of the group.

Response Schema

Response Handling (Only for Connect)

If the response contains no error, the member will reconcile its current assignment towards its new assignment. It does the following:

  1. It updates its member epoch.

  2. It computes the difference between the old and the new assignment to determine the revoked connectors/tasks and the newly assigned ones. There should be either revoked connectors/tasks or newly assigned connectors/tasks The protocol never does both together.

    1. It revokes the connectors/tasks, release all resources, and calls WorkerRebalanceListener#onRevoked.

    2. It assigns the new connectors/tasks, calls ConnectAssignor#onAssignment and calls WorkerRebalanceListener#onAssigned.

  3. After a revocation, It sends the next heartbeat immediately to acknowledge it.

Upon receiving the COMPUTE_ASSIGNMENT error, the worker starts the assignment process.

Upon receiving the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the worker abandons all its resources and rejoins with the same member id and the epoch 0.

GroupPrepareAssignment API

The GroupPrepareAssignment API will be used by the member to get the information to feed its client-side assignor.

Request Schema

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "GroupPrepareAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions
Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "GroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  //  Supported  "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." }
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= 0.

Request Handling

When the group coordinator handle a ConsumerGroupPrepareAssignmentRequest request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Returns the group state of the group.

Response Schema

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "GroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersionserrors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
  // Supported errors:
  //   "about": "The top-level error code, or 0 if there was no error" },- GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ErrorMessageThrottleTimeMs", "type": "stringint32", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-levelduration errorin message,milliseconds orfor nullwhich ifthe thererequest was nothrottled error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epochdue to a quota violation, or zero if the request did not violate any quota." },
    { "name": "AssignorNameErrorCode", "type": "stringint16", "versions": "0+",
      "about": "The selected assignor. "The top-level error code, or 0 if there was no error" },
    { "name": "MembersErrorMessage", "type": "[]Memberstring", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The members.", "fields": [
  top-level error message, or null if there was no error." },
    { "name": "MemberIdGroupEpoch", "type": "stringint32", "versions": "0+",
        "about": "The membergroup IDepoch." },
      { "name": "MemberEpochAssignorName", "type": "int32string", "versions": "0+",
        "about": "The memberselected epochassignor." },
      { "name": "InstanceIdMembers", "type": "string[]Member", "versions": "0+",
        "about": "The member instance ID." },members.", "fields": [
      { "name": "SubscribedTopicIdsMemberId", "type": "[]uuidstring", "versions": "0+",
        "about": "The subscribedmember topic IDsID." },
      { "name": "AssignorMemberEpoch", "type": "Assignorint32", "versions": "0+",
        "about": "The information of the selected assignor",
        "fields": [ "The member epoch." },
        { "name": "VersionInstanceId", "type": "int16string", "versions": "0+",
          "about": "The versionmember ofinstance the metadataID." },
        { "name": "ReasonSubscribedTopicIds", "type": "int8[]uuid", "versions": "0+",
          "about": "The reasonsubscribed of the metadata updatetopic IDs." },
        { "name": "MetadataAssignor", "type": "bytesAssignor", "versions": "0+",
          "about": "The assignor metadata." }
information of the selected assignor",
        ]},"fields": [
        { "name": "TopicPartitionsVersion", "type": "[]TopicPartitionint16", "versions": "0+",
          "about": "The targetversion topic-partitions of the membermetadata.",
        "fields": [
   },
        { "name": "TopicIdReason", "type": "uuidint8", "versions": "0+",
            "about": "The topic IDreason of the metadata update." },
          { "name": "PartitionsMetadata", "type": "[]int32bytes", "versions": "0+",
            "about": "The partitionsassignor metadata." }
      ]},
      { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",
          "about": "The assigned connectors/tasks to the member.",
          "fields": [
            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },
            { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+","about": "The tasks assigned to this worker." }
      ]}
    ]},
    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
          "about": "The number of partitions." }
    ]}   
  ]
}

...

As already discussed, we would be adding a new interface called Assignor. It will have 2 extensions namely PartitionAssignor and ConnectAssignor.

Assignor


Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public interface Assignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The reason reported by the member.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

    }

    class Assignment {
        /**
         * The assignment error.
         */
        byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;
        
        /**
         * The error reported by the assignor.
         */
        byte error;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    class Metadata {
        /**
         * The reason reported by the assignor.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link Metadata#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);

}




PartitionAssignor

Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.util.List;

public interface PartitionAssignor extends Assignor {

    class Group extends Assignor.Group {
        /**
         * The topics' metadata.
         */
        List<TopicMetadata> topics;

    }

    class GroupMember extends Assignor.GroupMember {

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;

    }

    class TopicMetadata {
        /**
         * The topic ID.
         */
        Uuid topicId;

        /**
         * The number of partitions.
         */
        int numPartitions;
    }

    class MemberAssignment extends Assignor.MemberAssignment {

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

    }

}

...