Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When the group coordinator handle handles a ConsumerGroupPrepareAssignmentRequest requestConnectGroupPrepareAssignmentRequest request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether whether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Returns the group state of the group.

...

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConnectGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "AssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "0+",
        "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "SubscribedTopicIds", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "Assignor", "type": "Assignor", "versions": "0+",
        "about": "The information of the selected assignor",
        "fields": [
        { "name": "Version", "type": "int16", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Reason", "type": "int8", "versions": "0+",
          "about": "The reason of the metadata update." },
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The assignor metadata." }
      ]},
      { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",
          "about": "The assigned connectors/tasks to the member.",
          "fields": [
            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },
            { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+","about": "The tasks assigned to this worker." }
      ]}
    ]}, 
    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { ]
}

Response Handling

  • If the response contains no error, the member calls the client side assignor with the group state.

  • Upon receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

  • Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

ConnectGroupInstallAssignment API

The ConnectGroupInstallAssignment API will be used by the member to install a new assignment for the group. The new assignment is the result of the client-side assignor.

Request Schema


Code Block
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "TopicIdConnectGroupInstallAssignment",
  "typevalidVersions": "uuid0",
  "versionsflexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The topicgroup IDidentifier." },
        { "name": "NumPartitionsMemberId", "type": "int32string", "versions": "0+",
          "about": "The number of partitions member id assigned by the group coordinator." },
    ]}   
  ]
}

Response Handling

  • If the response contains no error, the member calls the client side assignor with the group state.

  • If both TopicPartitons and ConnectorsAndTasks is present in the response then the worker would throw an error and not process further.

  • Topics would be empty for Connect.

  • Upon receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

  • Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

ConsumerGroupInstallAssignment API

The ConsumerGroupInstallAssignment API will be used by the member to install a new assignment for the group. The new assignment is the result of the client-side assignor.

Request Schema

Code Block
{
  "apiKey": TBD,
 { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "Error", "type": "requestint8",
  "listenersversions": ["zkBroker0+",
    "broker"],
   "about": "The assignment error; or zero if the assignment is successful." },
    { "name": "ConsumerGroupInstallAssignmentMembers",
  "validVersionstype": "0[]Member",
  "flexibleVersionsversions": "0+",
      "about": "The members.", "fields": [
      { "name": "GroupIdMemberId", "type": "string", "versions": "0+", "entityType": "groupId",

        "about": "The groupmember identifierID." },
        { "name": "MemberIdConnectorsAndTasks", "type": "string[]String", "versions": "0+",
        "about": "The member id assigned by the group coordinator." },assigned topic-partitions to the member.",
        "fields": [
          { "name": "MemberEpochConnectors", "type": "int32string", "versions": "0+",
            "about": "The member epoch. connectors assigned to this worker" },
          { "name": "GroupEpochtasks", "type": "int32[]ConnectorTaskID", "versions": "0+",
            "about": "The group epoch tasks assigned to this worker." }
        ]},
      { "name": "ErrorVersion", "type": "int8int32", "versions": "0+",
        "about": "The assignment error; or zero if the assignment is successful." },metadata version." }
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
      { "name": "MemberIdMetadata", "type": "stringbytes", "versions": "0+",
        "about": "The membermetadata IDbytes." },
        { "name": "ConnectorsAndTasks", "type": "[]String", "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          {]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= 0.

  • Both Partitions and ConnectorsAndTasks are set.

Request Handling

When the group coordinator handles a ConnectGroupInstallAssignment request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.

  6. Installs the new target assignment.

Response Schema


Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConnectorsConnectGroupInstallAssignment",
  "typevalidVersions": "string0",
  "versionsflexibleVersions": "0+",
  // Supported errors:
   //   - GROUP_AUTHORIZATION_FAILED
    "about": "The connectors assigned to this worker" },
      // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH 
  // - INVALID_ASSIGNMENT  
  "fields": [
    { "name": "tasksThrottleTimeMs", "type": "[]ConnectorTaskIDint32", "versions": "0+",
            "about": "The tasksduration assignedin tomilliseconds this worker." }
        ]for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
      { "name": "VersionErrorCode", "type": "int32int16", "versions": "0+",
        "about": "The metadata version.top-level error code, or 0 if there was no error" },
      { "name": "MetadataErrorMessage", "type": "bytesstring", "versions": "0+", "nullableVersions": "0+", "default": "null",
        "about": "The metadata bytes top-level error message, or null if there was no error." }
    ]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.

  • MemberId must be non-empty.

  • MemberEpoch must be >= 0.

  • Both Partitions and ConnectorsAndTasks are set.

Request Handling

When the group coordinator handles a ConsumerGroupInstallAssignment request:

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.

  6. Installs the new target assignment.

Response Schema

Code Block
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupInstallAssignment",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - INVALID_GROUP_ID
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH 
  // - INVALID_ASSIGNMENT  
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." }
  ]
}

Response Handling

If the response contains no error, the member is done.

Upon receiving the FENCED_MEMBER_EPOCH error, the worker retries when receiving its next heartbeat response with its member epoch.

Upon receiving any other errors, the worker abandon the process.

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

They will be persisted in the __worker_offsets compacted topic. The compacted topic based storage requires a dedicated key type per record type in order for the compaction to work. The current protocol already uses versions from 0 to 2 (included) for the keys.

Group Metadata

Groups can be rather large so we propose to use several records to store a group in order to not be limited by the maximum batch size (1MB by default). Therefore we propose to store group metadata with two records types: the ConnectWorkerGroupMetadata and the ConnectWorkerGroupMemberMetadata. Note that since these messages are independent of Consumer Groups, we are introducing new record types.

A group with X members will be stored with X+2 records. One ConnectWorkerGroupMemberMetadata per member, one ConnectWorkerGroupConnectorsTasksMetadata, and one ConnectWorkerGroupMetadata for the group at the end. Atomicity is not a concern here. All the records can be applied independently.

Moreover, the whole group does not necessarily have to be written for every epoch. Members who have not changed could be omitted as the compacted topic will retain their previous state anyway.

When a member is deleted, a tombstone for it is written to the partition.

ConnectWorkerGroupMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataKey",
    "validVersions": "3",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "3" }
    ]
}

ConnectWorkerGroupMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}

ConnectWorkerGroupConnectorsTasksMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataKey",
    "validVersions": "4",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "4" }
    ]
}

ConnectWorkerGroupConnectorsTasksMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "ConnectorsTasks", "versions": "0+",
          "type": "[]ConnectorsTasks", "fields": [
            { "name": "Connectors", "versions": "0+", "type": "[]String" },
            { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }
          ]}
    ], 
}

ConnectWorkerGroupMemberMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMemberMetadataKey",
    "validVersions": "5",
    "flexibleVersions": "none",
    "fields": [
        { "name": "GroupId", "type": "string", "versions": "5" },
        { "name": "MemberId", "type": "string", "versions": "5" }
    ]
}

ConnectWorkerGroupMemberMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
            { "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}

Target Assignment

The target assignment is stored in a single record.

ConnectWorkerGroupTargetAssignmentKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupTargetAssignmentKey",
    "validVersions": "6",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "5" }
    ]
}

ConnectWorkerGroupTargetAssignmentValue

Code Block
{
    "type": "data",
    "name": "GroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
            { "name": "MemberId", "versions": "0+", "type": "string" },
            { "name": "Error", "versions": "0+", "type": "int8" },
            { "name": "ConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "fields": 
            [
              { "name": "Connectors", "versions": "0+", "type": "[]String" },
              { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }
            ]
        },
         { "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
        ]
    ]
}

Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for it is written to the partition.

ConnectWorkerGroupCurrentMemberAssignmentKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupCurrentMemberAssignmentKey",
    "validVersions": "7",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "7" },
          { "name": "MemberId", "type": "string", "versions": "7" },
    ]
}

ConnectWorkerGroupCurrentMemberAssignmentValue

Code Block
{
    "type": "data",
    "name": "GroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },
        { "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "ConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "fields": 
            [
              { "name": "Connectors", "versions": "0+", "type": "[]String" },
              { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }
            ]
        },
        { "name": "Version", "versions": "0+", "type": "int16" },
        { "name": "Metadata", "versions": "0+", "type": "bytes" }
    ], 
}

Group Configurations

ConnectWorkerGroupConfigurationKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConfigurationKey",
    "validVersions": "8",
    "flexibleVersions": "none",
    "fields": [
       { "name": "GroupId", "type": "string", "versions": "8" }
    ]
}

ConnectWorkerGroupConfigurationValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConfigurationValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Configurations", "versions": "0+", "type": "[]Configuration",
          "fields": [
             { "name": "Name", "type": "string", "versions": "0+",
               "about": "The name of the configuration key." },
             { "name": "Value", "type": "string", "versions": "0+",
               "about": "The value of the configuration." }
        ]}
    ] 
}

Broker Metrics

We can add them later on.

Consumer API

As already discussed, we would be adding a new interface called Assignor. It will have 2 extensions namely PartitionAssignor and ConnectAssignor.

Assignor

Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public interface Assignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The reason reported by the member.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

    }

    class Assignment {
        /**
         * The assignment error.
         */
        byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;
        
        /**
         * The error reported by the assignor.
         */
        byte error;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    class Metadata {
        /**
         * The reason reported by the assignor.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link Metadata#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);

}

PartitionAssignor

ConnectAssignor

Code Block
languagejava
package org.apache.kafka.connect.runtime;

import org.apache.kafka.clients.consumer.Assignor;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;

import java.util.List;

public interface ConnectAssignor {

    class Group {

	 /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * Connector's and tasks metadata.
         */
        WorkerCoordinator.ConnectorsAndTasks connectorsAndTasks;
    }

	class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The reason reported by the member.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

	  /**
         * The worker state signifying the assigned connectors and
         * tasks.
         * Note
         */
        ExtendedWorkerState workerState;


    }


 	class Assignment {
        /**
         * The assignment error.
         */
        byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }


    class MemberAssignment {


	 	/**
         * The member ID.
         */
        String memberId;
        
        /**
         * The error reported by the assignor.
         */
        byte error;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;

        /**
         * The worker state signifying the assigned connectors and
         * tasks.
         */
        ExtendedAssignment assignment;


    }

}

Broker API

Assignor

Response Handling

If the response contains no error, the member is done.

Upon receiving the FENCED_MEMBER_EPOCH error, the worker retries when receiving its next heartbeat response with its member epoch.

Upon receiving any other errors, the worker abandon the process.

Records

This section describes the new record types required for the new protocol. The storage layout is based on the data model described earlier in this document.

They will be persisted in the __worker_offsets compacted topic. The compacted topic based storage requires a dedicated key type per record type in order for the compaction to work. The current protocol already uses versions from 0 to 2 (included) for the keys.

Group Metadata

Groups can be rather large so we propose to use several records to store a group in order to not be limited by the maximum batch size (1MB by default). Therefore we propose to store group metadata with two records types: the ConnectWorkerGroupMetadata and the ConnectWorkerGroupMemberMetadata. Note that since these messages are independent of Consumer Groups, we are introducing new record types.

A group with X members will be stored with X+2 records. One ConnectWorkerGroupMemberMetadata per member, one ConnectWorkerGroupConnectorsTasksMetadata, and one ConnectWorkerGroupMetadata for the group at the end. Atomicity is not a concern here. All the records can be applied independently.

Moreover, the whole group does not necessarily have to be written for every epoch. Members who have not changed could be omitted as the compacted topic will retain their previous state anyway.

When a member is deleted, a tombstone for it is written to the partition.

ConnectWorkerGroupMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataKey",
    "validVersions": "3",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "3" }
    ]
}


ConnectWorkerGroupMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" }
    ], 
}


ConnectWorkerGroupConnectorsTasksMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataKey",
    "validVersions": "4",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "4" }
    ]
}


ConnectWorkerGroupConnectorsTasksMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConnectorsTasksMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "ConnectorsTasks", "versions": "0+",
          "type": "[]ConnectorsTasks", "fields": [
            { "name": "Connectors", "versions": "0+", "type": "[]String" },
            { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }
          ]}
    ], 
}


ConnectWorkerGroupMemberMetadataKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMemberMetadataKey",
    "validVersions": "5",
    "flexibleVersions": "none",
    "fields": [
        { "name": "GroupId", "type": "string", "versions": "5" },
        { "name": "MemberId", "type": "string", "versions": "5" }
    ]
}


ConnectWorkerGroupMemberMetadataValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupMemberMetadataValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },
        { "name": "InstanceId", "versions": "0+", "type": "string" },
        { "name": "ClientId", "versions": "0+", "type": "string" },
        { "name": "ClientHost", "versions": "0+", "type": "string" },
        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },
        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },
        { "name": "Assignors", "versions": "0+",
          "type": "[]Assignor", "fields": [
            { "name": "Name", "versions": "0+", "type": "string" },
            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },
            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },
            { "name": "Reason", "versions": "0+", "type": "int8" },
            { "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
          ]}
    ], 
}


Target Assignment

The target assignment is stored in a single record.

ConnectWorkerGroupTargetAssignmentKey


Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupTargetAssignmentKey",
    "validVersions": "6",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "5" }
    ]
}


ConnectWorkerGroupTargetAssignmentValue

Code Block
{
    "type": "data",
    "name": "GroupTargetAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },
        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [
            { "name": "MemberId", "versions": "0+", "type": "string" },
            { "name": "Error", "versions": "0+", "type": "int8" },
            { "name": "ConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "fields": 
            [
              { "name": "Connectors", "versions": "0+", "type": "[]String" },
              { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }
            ]
        },
         { "name": "Version", "versions": "0+", "type": "int16" },
            { "name": "Metadata", "versions": "0+", "type": "bytes" }
        ]
    ]
}


Current Member Assignment

The current member assignment represents, as the name suggests, the current assignment of a given member.

When a member is deleted from the group, a tombstone for it is written to the partition.

ConnectWorkerGroupCurrentMemberAssignmentKey

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupCurrentMemberAssignmentKey",
    "validVersions": "7",
    "flexibleVersions": "none",
    "fields": [
          { "name": "GroupId", "type": "string", "versions": "7" },
          { "name": "MemberId", "type": "string", "versions": "7" },
    ]
}


ConnectWorkerGroupCurrentMemberAssignmentValue

Code Block
{
    "type": "data",
    "name": "ConnectGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },
        { "name": "Error", "versions": "0+", "type": "int8" },
        { "name": "ConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "fields": 
            [
              { "name": "Connectors", "versions": "0+", "type": "[]String" },
              { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }
            ]
        },
        { "name": "Version", "versions": "0+", "type": "int16" },
        { "name": "Metadata", "versions": "0+", "type": "bytes" }
    ], 
}


Group Configurations

ConnectWorkerGroupConfigurationKey


Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConfigurationKey",
    "validVersions": "8",
    "flexibleVersions": "none",
    "fields": [
       { "name": "GroupId", "type": "string", "versions": "8" }
    ]
}


ConnectWorkerGroupConfigurationValue

Code Block
{
    "type": "data",
    "name": "ConnectWorkerGroupConfigurationValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Configurations", "versions": "0+", "type": "[]Configuration",
          "fields": [
             { "name": "Name", "type": "string", "versions": "0+",
               "about": "The name of the configuration key." },
             { "name": "Value", "type": "string", "versions": "0+",
               "about": "The value of the configuration." }
        ]}
    ] 
}


Broker Metrics

We can add them later on.

Client side API

ConnectAssignor

Code Block
languagejava
package org.apache.kafka.connect.runtime;

import org.apache.kafka.clients.consumer.Assignor;
import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;

import java.util.List;

public interface ConnectAssignor {

    class Group {

	
Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Optional;

public interface Assignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The reason reported by the member.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}members.
         */
        intList<GroupMember> versionmembers;

        /**
         * Connector's Theand custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}tasks metadata.
         */
        ByteBufferWorkerCoordinator.ConnectorsAndTasks metadataconnectorsAndTasks;

    }

    	class AssignmentGroupMember {
        /**
         * The assignmentmember errorID.
         */
        byteString errormemberId;

        /**
         * The member assignmentinstance ID if provided.
         */
        List<MemberAssignment>Optional<String> members;
    }

    class MemberAssignment {instanceId;

        /**
         * The member IDreason reported by the member.
         */
        Stringbyte memberIdreason;
        
        /**
         * The error reported by the assignor version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        byteint errorversion;

        /**
         * The custom versionmetadata provided ofby the metadata encoded in member as defined
         * by {{@link GroupMember#metadataPartitionAssignor#metadata()}}.
         */
        intByteBuffer versionmetadata;

	  /**
         /*** The worker state signifying the assigned connectors and
         * The custom metadata provided by the assignor.tasks.
         * Note
         */
        ByteBufferExtendedWorkerState metadataworkerState;


    }


    	class MetadataAssignment {
        /**
         * The reason reported by the assignorassignment error.
         */
        byte reasonerror;

        /**
         * The version of the metadata encoded in {{@link Metadata#metadata()}}member assignment.
         */
        intList<MemberAssignment> version;members;
    }


    class MemberAssignment {


	  	/**
         * The custom metadata provided by the assignormember ID.
         */
        ByteBufferString metadatamemberId;
      }

  
        /**
         * UniqueThe error namereported forby thisthe assignor.
         */
    String name()    byte error;

        /**
     * The minimum version.
 * The version of */
the metadata encoded in int{{@link minimumVersionGroupMember#metadata();
}}.
    /**
     * The maximum version./
     */
    int maximumVersion()version;

        /**
      * Return serialized data* thatThe willcustom bemetadata sentprovided toby the assignor.
         */
      Metadata  ByteBuffer metadata();

        /**
         * PerformThe theworker groupstate assignmentsignifying given the currentassigned membersconnectors and
         * topic metadatatasks.
         */
       * @param group The group state. ExtendedAssignment assignment;


    }


 class Metadata {
     * @return The new/**
 assignment for the group.
     */
 The reason reported Assignmentby assign(Group group);

the assignor.
    /**
     */
 Callback   which is invoked when the member received a new
byte reason;

        /**
 assignment    from the assignor/group coordinator.
 * The version of */
the metadata encoded in void{{@link onAssignmentMetadata#metadata(MemberAssignment assignment);

}

PartitionAssignor

Code Block
languagejava
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;

import java.util.List;

public interface PartitionAssignor extends Assignor {

)}}.
         */
     class Group extends Assignor.Group {int version;

        /**
         * The topics'custom metadata.
 provided by       */the assignor.
        List<TopicMetadata> topics;

    }*/

     class GroupMember extends Assignor.GroupMember {
ByteBuffer metadata;
    }

    /**
     * Unique name for *this Theassignor.
 set of topic IDs that*/
 the member is subscribed to.String name();

    /**
     */ The minimum version.
     */
    List<Uuid>int subscribedTopicIdsminimumVersion();

        /**
     * The maximum version.
 * The partitions owned by*/
 the member at the current epoch.int maximumVersion();

    /**
     */
 Return serialized data that will be sent to List<TopicIdPartition>the ownedPartitions;assignor.

     }
*/
    class TopicMetadata {
 Metadata metadata();

       /**
     * Perform the group *assignment Thegiven topicthe ID.
current members and
     * topic */metadata.
     *
   Uuid topicId;

 * @param group The    /**group state.
     * @return The new *assignment Thefor numberthe of partitionsgroup.
         */
    Assignment    int numPartitionsassign(Group group);

    }
/**
    class MemberAssignment* extendsCallback Assignor.MemberAssignment {

        /**which is invoked when the member received a new
     * assignment from the * The assigned partitionsassignor/group coordinator.
         */
    void    List<TopicIdPartition> partitions;

    }onAssignment(MemberAssignment assignment);

}

Worker and Assignment Metadata

...