Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Upgrading to the new protocol or downgrading from it is possible by rolling the workers, assuming that the new protocol is enabled on the server side, with the correct group.protocol . When the first worker that supports the new protocol joins the group, the group is converted from generic to consumer connect . Similarly when the last worker supporting the new protocol leaves, the group switches back to the old protocol. Note that the group epoch starts at the current group generation.

One important thing to note is that JoinGroup, SyncGroup and Heartbeat calls  calls need to be translated into the new protocol. The KIP explains it in depth in KIP-848 section. TL;DR =>

  1. In the current protocol, when a new worker (re)joins the group, it issues JoinGroup request containing it’s assignments.

  2. When all workers have joined, the Group co-ordinator picks a leader and sends JoinGroup response to all workers.

  3. The leader computes the assignments and the other workers collect their assignment with the SyncGroup request/response dance.

  4. In parallel, the workers keep sending heartbeats to the Group coordinator to maintain the session using Heartbeat API. The Group coordinator also uses the heartbeat response to notify the workers of any ongoing rebalance.

The new protocol relies on the ConsumerGroupHeartbeat API to do all the above. Concretely, the API updates the group state, provides the connectors/tasks owned by the worker, gets back the assignment, and updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when needed by returning the REBALANCE_IN_PROGRESS error in the heartbeat response.

We will rely on ConnectGroupHeartbeat  API for the translation of the 3 APIs. Concretely, the API updates the group state, provides the connectors/tasks owned by the worker, gets back the assignment, and updates the session. The main difference here is that the JoinGroup and SyncGroup does not run continuously. The group coordinator has to trigger it when needed by returning the REBALANCE_IN_PROGRESS  error in the heartbeat response.

The The idea is to manage each workers individually while relying on the new engine to do the synchronization between them. Each worker will use rebalance loops to update the group coordinator and collect their assignment. The group coordinator will ensure that a rebalance is triggered when one needs to update it’s assignments.

...

  1. Lookups the group or creates it.

  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether whether the member epoch matches the member epoch in its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.

    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and his request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions or connectors/tasks owned by the members are a subset of the target assignments. This could be decided based on the types . If it is the case, it is safe to transition the member to its target epoch again.

  4. Updates the members informations if any. The group epoch is incremented if there is any change.

  5. Reconcile the member assignments as explained earlier in this document. 

...

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConnectGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." },
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not provided; the assignment otherwise."
      "fields": [
        { "name": "Error", "type": "int8", "versions": "0+",
          "about": "The assigned error." },
        { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",
          "about": "The assigned connectors/tasks to the member.",
          "fields": [
            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },
            { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+","about": "The tasks assigned to this worker." }
        ]},
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The assigned metadata." }
    ]
  ]
}

Response

...

Handling 

If the response contains no error, the member will reconcile its current assignment towards its new assignment. It does the following:

...

Upon receiving the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the worker abandons all its resources and rejoins with the same member id and the epoch 0.

...

ConnectGroupPrepareAssignment API

The GroupPrepareAssignment ConnectGroupPrepareAssignment API will be used by the member to get the information to feed its client-side assignor.

...

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "GroupPrepareAssignmentRequestConnectGroupPrepareAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." }
  ]
}

...

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "GroupPrepareAssignmentResponseConnectGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "SubscribedTopicIds", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "Assignor", "type": "Assignor", "versions": "0+",
        "about": "The information of the selected assignor",
        "fields": [
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Reason", "type": "int8", "versions": "0+",
          "about": "The reason of the metadata update." },
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The assignor metadata." }
      ]},
      { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",
          "about": "The assigned connectors/tasks to the member.",
          "fields": [
            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },
            { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+","about": "The tasks assigned to this worker." }
      ]}
    ]},
    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
          "about": "The number of partitions." }
    ]}   
  ]
}

Response Handling

  • If the response contains no error, the member calls the client side assignor with the group state.

  • If both TopicPartitons and ConnectorsAndTasks is present in the response then the worker would throw an error and not process further.

  • Topics would be empty for Connect.

  • Upon receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

  • Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

...

ConsumerGroupInstallAssignment API

The GroupInstallAssignment ConsumerGroupInstallAssignment API will be used by the member to install a new assignment for the group. The new assignment is the result of the client-side assignor.

...

Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "GroupInstallAssignmentRequestConsumerGroupInstallAssignment",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "Error", "type": "int8", "versions": "0+",
      "about": "The assignment error; or zero if the assignment is successful." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
        { "name": "PartitionsConnectorsAndTasks", "type": "[]TopicPartitionString", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicIdConnectors", "type": "uuidstring", "versions": "0+",
            "about": "The topic ID. connectors assigned to this worker" },
          { "name": "Partitionstasks", "type": "[]int32ConnectorTaskID", "versions": "0+",
            "about": "The partitions tasks assigned to this worker." }
        ]},
        { "name": "ConnectorsAndTasksVersion", "type": "[]Stringint32", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
    metadata version." }
      { "name": "ConnectorsMetadata", "type": "stringbytes", "versions": "0+",
            "about": "The connectors assigned to this workermetadata bytes." },
          { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+",
            "about": "The tasks assigned to this worker." }
        ]},
      { "name": "Version", "type": "int32", "versions": "0+",
        "about": "The metadata version." }
      { "name": "Metadata", "type": "bytes", "versions]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= 0.

  • Both Partitions and ConnectorsAndTasks are set.

Request Handling

When the group coordinator handles a ConsumerGroupInstallAssignment request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.

  6. Installs the new target assignment.

Response Schema


Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupInstallAssignment",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
    "about": "The metadata bytes." }
    ]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= 0.

  • Both Partitions and ConnectorsAndTasks are set.

Request Handling

When the group coordinator handles a GroupInstallAssignmentRequest request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.

  6. Installs the new target assignment.

Response Schema

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "GroupInstallAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH 
  // - INVALID_ASSIGNMENT  
  "fields": [// - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH 
  // - INVALID_ASSIGNMENT  
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ThrottleTimeMsErrorCode", "type": "int32int16", "versions": "0+",
      "about": "The durationtop-level in milliseconds for which the request was throttled due to a quota violationerror code, or zero0 if thethere requestwas did not violate any quota.no error" },
    { "name": "ErrorCodeErrorMessage", "type": "int16string", "versions": "0+",
      "aboutnullableVersions": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

Response Handling

If the response contains no error, the member is done.

...

When a member is deleted, a tombstone for it is written to the partition.

...

a tombstone for it is written to the partition.

ConnectWorkerGroupMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataKey",
    "validVersions": "3",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "3" }
    ]
}


ConnectWorkerGroupMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}


ConnectWorkerGroupConnectorsTasksMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataKey",
    "validVersions": "4",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "4" }
    ]
}


ConnectWorkerGroupConnectorsTasksMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataKeyConnectWorkerGroupConnectorsTasksMetadataValue",
    "validVersions": "30",
    "flexibleVersions": "none0+",
    "fields": [
          { "name": "GroupIdEpoch", "typeversions": "string0+", "versionstype": "3int32" },
    ]
}

ConnectWorkerGroupMetadataValue

Code Block
{
    "type{ "name": "ConnectorsTasks", "versions": "data0+",
    "name      "type": "ConnectWorkerGroupMetadataValue[]ConnectorsTasks", "fields": [
    "validVersions        { "name": "0Connectors",
    "flexibleVersionsversions": "0+",
    "fieldstype": "[]String" },
            { "name": "EpochTasks", "versions": "0+", "type": "int32" []ConnectorsTasks" }
          ]}
    ], 
}

...


ConnectWorkerGroupMemberMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataKeyConnectWorkerGroupMemberMetadataKey",
    "validVersions": "45",
    "flexibleVersions": "none",
    "fields": [
        { "name": "GroupId", "type": "string", "versions": "5" },
        { "name": "GroupIdMemberId", "type": "string", "versions": "45" }
    ]
}

...


ConnectWorkerGroupMemberMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataValueConnectWorkerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "EpochGroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "ConnectorsTasksInstanceId", "versions": "0+",
          "type": "[]ConnectorsTasksstring", "fields": [
   },
          { "name": "ConnectorsClientId", "versions": "0+", "type": "[]Stringstring" },
            { "name": "TasksClientHost", "versions": "0+", "type": "[]ConnectorsTasksstring" },
          ]}
    ], 
}

ConnectWorkerGroupMemberMetadataKey

Code Block
{
    { "name": "SubscribedTopicNames", "versions": "0+", "type": "data[]string" },
        { "name": "ConnectWorkerGroupMemberMetadataKeySubscribedTopicRegex",
    "validVersionsversions": "50+",
    "flexibleVersionstype": "nonestring",
    "fields": [ },
        { "name": "GroupIdAssignors", "typeversions": "string0+",
  "versions        "type": "5" },[]Assignor", "fields": [
            { "name": "MemberIdName", "typeversions": "string0+", "versionstype": "5string" },
    ]
}

ConnectWorkerGroupMemberMetadataValue

Code Block
{
    "type": "data",
     { "name": "ConnectWorkerGroupMemberMetadataValueMinimumVersion",
    "validVersionsversions": "0+",
    "flexibleVersionstype": "0+int16" },
    "fields": [
        { "name": "GroupEpochMaximumVersion", "versions": "0+", "type": "int32int16" },
            { "name": "InstanceIdReason", "versions": "0+", "type": "stringint8" },

            { "name": "ClientIdVersion", "versions": "0+", "type": "stringint16" },
            { "name": "ClientHostMetadata", "versions": "0+", "type": "stringbytes" },
          ]}
    ], 
}


Target Assignment

The target assignment is stored in a single record.

ConnectWorkerGroupTargetAssignmentKey


Code Block
{
    "type": "data",
    "name": "SubscribedTopicNamesConnectWorkerGroupTargetAssignmentKey",
    "versionsvalidVersions": "0+6",
    "typeflexibleVersions": "[]string" },
none",
    "fields": [
          { "name": "SubscribedTopicRegexGroupId", "versionstype": "0+string", "typeversions": "string5" },
    ]
}


ConnectWorkerGroupTargetAssignmentValue

Code Block
{
    { "nametype": "Assignorsdata",
    "versionsname": "0+GroupTargetAssignmentValue",
    "validVersions": "0",
      "typeflexibleVersions": "[]Assignor0+",
    "fields": [
            { "name": "NameAssignmentEpoch", "versions": "0+", "type": "stringint32" },
            { "name": "MinimumVersionMembers", "versions": "0+", "type": "int16" },[]Member", "fields": [
            { "name": "MaximumVersionMemberId", "versions": "0+", "type": "int16string" },
            { "name": "ReasonError", "versions": "0+", "type": "int8" },
            { "name": "VersionConnectorsTasks", "versions": "0+", "type": "int16" },[]ConnectorsTasks", "fields": 
            [
              { "name": "MetadataConnectors", "versions": "0+", "type": "bytes[]String" },
          ]}
    ], 
}

Target Assignment

The target assignment is stored in a single record.

ConnectWorkerGroupTargetAssignmentKey

Code Block
{
    "type{ "name": "Tasks", "versions": "data0+",
    "nametype": "ConnectWorkerGroupTargetAssignmentKey",[]ConnectorsTasks" }
    "validVersions": "6",
      "flexibleVersions": "none",  ]
    "fields": [    },
          { "name": "GroupIdVersion", "typeversions": "string0+", "versionstype": "5int16" },
    ]
}

ConnectWorkerGroupTargetAssignmentValue

Code Block
{
    "type": "data",
     { "name": "GroupTargetAssignmentValueMetadata",
    "validVersionsversions": "0+",
    "flexibleVersionstype": "0+bytes", }
    "fields": [
    ]
    ]
}


Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for it is written to the partition.

ConnectWorkerGroupCurrentMemberAssignmentKey

Code Block
{
    "{ "name": "AssignmentEpoch", "versions": "0+", "type": "int32data" },
        { "name": "MembersConnectWorkerGroupCurrentMemberAssignmentKey",
    "versionsvalidVersions": "0+7",
    "typeflexibleVersions": "[]Membernone",
    "fields": [
            { "name": "MemberIdGroupId", "versionstype": "0+string", "typeversions": "string7" },
            { "name": "ErrorMemberId", "versionstype": "0+string", "typeversions": "int87" },
    ]
}


ConnectWorkerGroupCurrentMemberAssignmentValue

Code Block
{
    "type": "data",
    { "name": "ConnectorsTasksGroupCurrentMemberAssignmentValue",
    "versionsvalidVersions": "0+",
    "typeflexibleVersions": "[]ConnectorsTasks0+",
    "fields": 
            [
              { "name": "ConnectorsMemberEpoch", "versions": "0+", "type": "[]Stringint32" },
              { "name": "TasksError", "versions": "0+", "type": "[]ConnectorsTasks" }
            ]
       int8" },
         { "name": "VersionConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "int16" },fields": 
            [
              { "name": "MetadataConnectors", "versions": "0+", "type": "bytes[]String" },
        ]
    ]
}

Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for it is written to the partition.

ConnectWorkerGroupCurrentMemberAssignmentKey

Code Block
{
    "type": "data",
     { "name": "ConnectWorkerGroupCurrentMemberAssignmentKeyTasks",
    "validVersionsversions": "70+",
    "flexibleVersionstype": "none",
[]ConnectorsTasks" }
      "fields": [
      ]
        },
        { "name": "GroupIdVersion", "typeversions": "string0+", "versionstype": "7int16" },
          { "name": "MemberIdMetadata", "typeversions": "string0+", "versionstype": "7bytes" },
    ]
}


Group Configurations

...

ConnectWorkerGroupConfigurationKey


Code Block
{
    "type": "data",
    "name": "GroupCurrentMemberAssignmentValueConnectWorkerGroupConfigurationKey",
    "validVersions": "08",
    "flexibleVersions": "0+none",
    "fields": [
        { "name": "MemberEpochGroupId", "versionstype": "0+string", "typeversions": "int328" },
    ]
}


ConnectWorkerGroupConfigurationValue

Code Block
{
    "type": "data",
  {  "name": "ConnectWorkerGroupConfigurationValue",
    "validVersions": "Error",0",
    "versionsflexibleVersions": "0+",
    "typefields": "int8" },[
        { "name": "ConnectorsTasksConfigurations", "versions": "0+", "type": "[]ConnectorsTasksConfiguration", "fields": 
           "fields": [
              { "name": "ConnectorsName", "versionstype": "0+string", "typeversions": "[]String" }0+",
              {  "nameabout": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }The name of the configuration key." },
            ]
        },
        { "name": "VersionValue", "versionstype": "0+string", "typeversions": "int160+" },
            {   "nameabout": "Metadata", "versions": "0+", "type": "bytes" The value of the configuration." }
        ]}
    ], 
}

Group Configurations

ConnectWorkerGroupConfigurationKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConfigurationKey",
    "validVersions": "8",
    "flexibleVersions": "none",
    "fields": [
       { "name": "GroupId", "type": "string", "versions": "8" }
    ]
}

ConnectWorkerGroupConfigurationValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConfigurationValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Configurations", "versions": "0+", "type": "[]Configuration",
          "fields": [
             { "name": "Name", "type": "string", "versions": "0+",
               "about": "The name of the configuration key." },
             { "name": "Value", "type": "string", "versions": "0+",
               "about": "The value of the configuration." }
        ]}
    ] 
}

Broker Metrics

We can add them later on.

Consumer API

As already discussed, we would be adding a new interface called Assignor. It will have 2 extensions namely PartitionAssignor and ConnectAssignor.

Assignor


Broker Metrics

We can add them later on.

Consumer API

As already discussed, we would be adding a new interface called Assignor. It will have 2 extensions namely PartitionAssignor and ConnectAssignor.

Assignor


Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public interface Assignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;
Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public interface Assignor {

    class Group {
        /**
         * The members reason reported by the member.
         */
        List<GroupMember>byte membersreason;

    }

    class/**
 GroupMember {
        /**
 The version of the metadata encoded in  * The member ID{{@link GroupMember#metadata()}}.
         */
        Stringint memberIdversion;

        /**
         * The instance ID if providedcustom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

    }

    class Optional<String>Assignment instanceId;{

        /**
         * The reason reported by the memberassignment error.
         */
        byte reasonerror;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}member assignment.
         */
        intList<MemberAssignment> versionmembers;

        /**}

    class MemberAssignment   {
 * The custom metadata provided by the member as defined /**
         * byThe {{@link PartitionAssignor#metadata()}}member ID.
         */
        ByteBufferString metadatamemberId;

    }

    class Assignment {
        /**
         * The assignment errorerror reported by the assignor.
         */
        byte error;

        /**
         * The memberversion assignment.
of the metadata encoded in     */{{@link GroupMember#metadata()}}.
        List<MemberAssignment> members;*/
    }

    class MemberAssignmentint {version;

        /**
         * The member IDcustom metadata provided by the assignor.
         */
        StringByteBuffer memberIdmetadata;
    }

    class Metadata {
        /**
         * The errorreason reported by the assignor.
         */
        byte errorreason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadataMetadata#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    /**
    class Metadata { * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data *that Thewill reasonbe reportedsent byto the assignor.
         */
        byte reasonMetadata metadata();

        /**
     * Perform the  * The version of the metadata encoded in {{@link Metadata#metadata()}}group assignment given the current members and
     * topic metadata.
     *
     */
 @param group The group state.
   int version;

 * @return The new assignment for the /**group.
     */
     * The custom metadata provided by the assignor.
         */
   Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     ByteBuffer* metadata;
assignment from the assignor/group }coordinator.

     */**
    void * Unique name for this assignor.
     */
    String name();

    /**
onAssignment(MemberAssignment assignment);

}




PartitionAssignor

ConnectAssignor

Code Block
languagejava
package org.apache.kafka.connect.runtime;

import org.apache.kafka.clients.consumer.Assignor;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;

import java.util.List;

public interface ConnectAssignor {

    class Group {

	 /**
         * The minimum versionmembers.
         */
      int minimumVersion()  List<GroupMember> members;

        /**
     * The maximum version.
 * Connector's and tasks */metadata.
    int maximumVersion();

    */**
     * Return serialized data that will be sent to the assignor.
     */WorkerCoordinator.ConnectorsAndTasks connectorsAndTasks;
    }

	class GroupMember {
    Metadata metadata();

    /**
     * Perform the group assignment* givenThe the current members andmember ID.
     * topic metadata.
     */
     * @param group The group state.String memberId;

     * @return The new/**
 assignment for the group.
      */
 The instance ID Assignment assign(Group group);

if provided.
    /**
     */
   Callback which is invoked when the member received a new
Optional<String> instanceId;

        /**
 assignment from the assignor/group coordinator.
     */
 The reason reported voidby onAssignment(MemberAssignment assignment);

}

PartitionAssignor

Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.util.List;

public interface PartitionAssignor extends Assignor {

the member.
         */
     class Group extends Assignor.Group {byte reason;

        /**
         * The topics' metadata version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        List<TopicMetadata>int topicsversion;

    }

    class/**
 GroupMember extends Assignor.GroupMember {

     * The custom /**
metadata provided by the member as defined
   * The set of topic IDs that* theby member is subscribed to{{@link PartitionAssignor#metadata()}}.
         */
        List<Uuid>ByteBuffer subscribedTopicIdsmetadata;

	  /**
      /**   * The worker state signifying the assigned connectors and
         * Thetasks.
  partitions owned by the member at the current* epoch.Note
         */
        List<TopicIdPartition>ExtendedWorkerState ownedPartitionsworkerState;


    }


    	class TopicMetadataAssignment {
        /**
         * The topicassignment IDerror.
         */
        Uuidbyte topicIderror;

        /**
         * The numbermember of partitionsassignment.
         */
        intList<MemberAssignment> numPartitionsmembers;
    }


    class MemberAssignment extends Assignor.MemberAssignment {

        
	 	/**
         * The assignedmember partitionsID.
         */
        List<TopicIdPartition> partitions;

    }

}

ConnectAssignor

Code Block
languagejava
package org.apache.kafka.connect.runtime;

import org.apache.kafka.clients.consumer.Assignor;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;

import java.util.List;

public interface ConnectAssignor extends Assignor {

    class Group extends Assignor.Group {


        String memberId;
        
        /**
         * Connector's and tasks metadataThe error reported by the assignor.
         */
        WorkerCoordinator.ConnectorsAndTasksbyte connectorsAndTaskserror;
    }

    class GroupMember extends Assignor.GroupMember {

 /**
         /**
 The version of the metadata encoded in  * The worker state signifying the assigned connectors and{{@link GroupMember#metadata()}}.
         */
         * tasks.int version;

         /** Note
         */
     The custom metadata provided by the assignor.
    ExtendedWorkerState workerState;

    }*/

     class MemberAssignment extends Assignor.MemberAssignment {ByteBuffer metadata;

        /**
         * The worker state signifying the assigned connectors and
         * tasks.
         */
        ExtendedAssignment assignment;


    }

}

...