Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The group coordinator will only set the Assignment field when the member epoch is smaller than the target assignment epoch. This is done to ensure that the members converge to the target assignment.

Code Block
{

...


  "apiKey": TBD,

...


  "type": "response",

...


  "name": "GroupHeartbeatResponse",

...


  "validVersions": "0",

...


  "flexibleVersions": "0+",

...


  // Supported errors:

...


  // - GROUP_AUTHORIZATION_FAILED

...


  // - NOT_COORDINATOR

...


  // - COORDINATOR_NOT_AVAILABLE

...


  // - COORDINATOR_LOAD_IN_PROGRESS

...


  // - INVALID_REQUEST

...


  // - UNKNOWN_MEMBER_ID

...


  // - FENCED_MEMBER_EPOCH

...


  // - UNSUPPORTED_ASSIGNOR

...


  // - COMPUTE_ASSIGNMENT

...


  "fields": [

...


    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

...


      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

...


    { "name": "ErrorCode", "type": "int16", "versions": "0+",

...


      "about": "The top-level error code, or 0 if there was no error" },

...


    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

...


      "about": "The top-level error message, or null if there was no error." },

...


    { "name": "MemberEpoch", "type": "int32", "versions": "0+",

...


      "about": "The member epoch." },

...


    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",

...


      "about": "The heartbeat interval in milliseconds." },

...


    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",

...


      "about": "null if not provided; the assignment otherwise."

...


      "fields": [

...


        { "name": "Error", "type": "int8", "versions": "0+",

...


          "about": "The assigned error." },

...


        { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",

...


          "about": "The assigned topic-partitions to the member.",

...


          "fields": [

...


            { "name": "TopicId", "type": "uuid", "versions": "0+",

...


              "about": "The topic ID." },

...


            { "name": "Partitions", "type": "[]int32", "versions": "0+",

...


              "about": "The partitions." }

...


        ]},

...


        { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",

...


          "about": "The assigned connectors/tasks to the member.",

...


          "fields": [

...


            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },

...


            { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+","about": "The tasks assigned to this worker." }

...


        ]},

...


        { "name": "Version", "type": "int16", "versions": "0+",

...


          "about": "The version of the metadata." },

...


        { "name": "Metadata", "type": "bytes", "versions": "0+",

...


          "about": "The assigned metadata." }

...


    ]

...


  ]

...


}

Response Handling (Only for Connect)

...

The GroupPrepareAssignment API will be used by the member to get the information to feed its client-side assignor.

Request Schema

Code Block
{

...


  "apiKey": TBD,

...


  "type": "request",

...


  "listeners": ["zkBroker", "broker"],

...


  "name": "GroupPrepareAssignmentRequest",

...


  "validVersions": "0",

...


  "flexibleVersions": "0+",

...


  "fields": [

...


    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

...


      "about": "The group identifier." },

...


    { "name": "MemberId", "type": "string", "versions": "0+",

...


      "about": "The member id assigned by the group coordinator." },

...


    { "name": "MemberEpoch", "type": "int32", "versions": "0+",

...


      "about": "The member epoch." }

...


  ]

...


}

Required ACL

  • Read Group

Request Validation

...

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Returns the group state of the group.

Response Schema

Code Block
{

...


  "apiKey": TBD,

...


  "type": "response",

...


  "name": "GroupPrepareAssignmentResponse",

...


  "validVersions": "0",

...


  "flexibleVersions": "0+",

...


  // Supported errors:

...


  // - GROUP_AUTHORIZATION_FAILED

...


  // - NOT_COORDINATOR

...


  // - COORDINATOR_NOT_AVAILABLE

...


  // - COORDINATOR_LOAD_IN_PROGRESS

...


  // - INVALID_REQUEST

...


  // - INVALID_GROUP_ID

...


  // - GROUP_ID_NOT_FOUND

...


  // - UNKNOWN_MEMBER_ID

...


  // - FENCED_MEMBER_EPOCH

...


  "fields": [

...


    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

...


      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

...


    { "name": "ErrorCode", "type": "int16", "versions": "0+",

...


      "about": "The top-level error code, or 0 if there was no error" },

...


    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

...


      "about": "The top-level error message, or null if there was no error." },

...


    { "name": "GroupEpoch", "type": "int32", "versions": "0+",

...


      "about": "The group epoch." },

...


    { "name": "AssignorName", "type": "string", "versions": "0+",

...


      "about": "The selected assignor." },

...


    { "name": "Members", "type": "[]Member", "versions": "0+",

...


      "about": "The members.", "fields": [

...


      { "name": "MemberId", "type": "string", "versions": "0+",

...


        "about": "The member ID." },

...


      { "name": "MemberEpoch", "type": "int32", "versions": "0+",

...


        "about": "The member epoch." },

...


      { "name": "InstanceId", "type": "string", "versions": "0+",

...


        "about": "The member instance ID." },

...


      { "name": "SubscribedTopicIds", "type": "[]uuid", "versions": "0+",

...


        "about": "The subscribed topic IDs." },

...


      { "name": "Assignor", "type": "Assignor", "versions": "0+",

...


        "about": "The information of the selected assignor",

...


        "fields": [

...


        { "name": "Version", "type": "int16", "versions": "0+",

...


          "about": "The version of the metadata." },

...


        { "name": "Reason", "type": "int8", "versions": "0+",

...


          "about": "The reason of the metadata update." },

...


        { "name": "Metadata", "type": "bytes", "versions": "0+",

...


          "about": "The assignor metadata." }

...


      ]},

...


      { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+",

...


        "about": "The target topic-partitions of the member.",

...


        "fields": [

...


          { "name": "TopicId", "type": "uuid", "versions": "0+",

...


            "about": "The topic ID." },

...


          { "name": "Partitions", "type": "[]int32", "versions": "0+",

...


            "about": "The partitions." }

...


      ]},

...


      { "name": "ConnectorsAndTasks", "type": "[]ConnectorsAndTask", "versions": "0+",

...


          "about": "The assigned connectors/tasks to the member.",

...


          "fields": [

...


            { "name": "connectors", "type": "[]String", "versions": "0+","about": "The Connectors assigned to this worker." },

...


            { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+","about": "The tasks assigned to this worker." }

...


      ]}

...


    ]},

...


    { "name": "Topics", "type": "[]TopicMetadata", "versions": "0+",

...


      "about": "The topic-partition metadata.",

...


      "fields": [

...


        { "name": "TopicId", "type": "uuid", "versions": "0+",

...


          "about": "The topic ID." },

...


        { "name": "NumPartitions", "type": "int32", "versions": "0+",

...


          "about": "The number of partitions." }

...


    ]}   

...


  ]

...


}

Response Handling

  • If the response contains no error, the member calls the client side assignor with the group state.

  • If both TopicPartitons and ConnectorsAndTasks is present in the response then the worker would throw an error and not process further.

  • Topics would be empty for Connect.

  • Upon receiving the UNKNOWN_MEMBER_ID error, the consumer abandon the process.

  • Upon receiving the FENCED_MEMBER_EPOCH error, the consumer retries when receiving its next heartbeat response with its member epoch.

...

The GroupInstallAssignment API will be used by the member to install a new assignment for the group. The new assignment is the result of the client-side assignor.

Request Schema


Code Block
{

...


  "apiKey": TBD,

...


  "type": "request",

...


  "listeners": ["zkBroker", "broker"],

...


  "name": "GroupInstallAssignmentRequest",

...


  "validVersions": "0",

...


  "flexibleVersions": "0+",

...


  "fields": [

...


    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",

...


      "about": "The group identifier." },

...


    { "name": "MemberId", "type": "string", "versions": "0+",

...


      "about": "The member id assigned by the group coordinator." },

...


    { "name": "MemberEpoch", "type": "int32", "versions": "0+",

...


      "about": "The member epoch." },

...


    { "name": "GroupEpoch", "type": "int32", "versions": "0+",

...


      "about": "The group epoch." },

...


    { "name": "Error", "type": "int8", "versions": "0+",

...


      "about": "The assignment error; or zero if the assignment is successful." },

...


    { "name": "Members", "type": "[]Member", "versions": "0+",

...


      "about": "The members.", "fields": [

...


      { "name": "MemberId", "type": "string", "versions": "0+",

...


        "about": "The member ID." },

...


      { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",

...


        "about": "The assigned topic-partitions to the member.",

...


        "fields": [

...


          { "name": "TopicId", "type": "uuid", "versions": "0+",

...


            "about": "The topic ID." },

...


          { "name": "Partitions", "type": "[]int32", "versions": "0+",

...


            "about": "The partitions." }

...


        ]},

...


        { "name": "ConnectorsAndTasks", "type": "[]String", "versions": "0+",

...


        "about": "The assigned topic-partitions to the member.",

...


        "fields": [

...


          { "name": "Connectors", "type": "string", "versions": "0+",

...


            "about": "The connectors assigned to this worker" },

...


          { "name": "tasks", "type": "[]ConnectorTaskID", "versions": "0+",

...


            "about": "The tasks assigned to this worker." }

...


        ]},

...


      { "name": "Version", "type": "int32", "versions": "0+",

...


        "about": "The metadata version." }

...


      { "name": "Metadata", "type": "bytes", "versions": "0+",

...


        "about": "The metadata bytes." }

...


    ]}

...


  ]

...


}

Required ACL

  • Read Group

Request Validation

...

  1. Checks wether the group exists. If it does not, GROUP_ID_NOT_FOUND is returned.

  2. Checks wether the member exists. If it does not, UNKNOWN_MEMBER_ID is returned.

  3. Checks wether the member epoch matches the current member epoch. If it does not, FENCED_MEMBER_EPOCH is returned.

  4. Checks wether the member is the chosen one to compute the assignment. If it does not, UNKNOWN_MEMBER_ID is returned.

  5. Validates the assignment based on the information used to compute it. If it is not valid, INVALID_ASSIGNMENT is returned.

  6. Installs the new target assignment.

Response Schema


Code Block
{

...


  "apiKey": TBD,

...


  "type": "response",

...


  "name": "GroupInstallAssignmentResponse",

...


  "validVersions": "0",

...


  "flexibleVersions": "0+",

...


  // Supported errors:

...


  // - GROUP_AUTHORIZATION_FAILED

...


  // - NOT_COORDINATOR

...


  // - COORDINATOR_NOT_AVAILABLE

...


  // - COORDINATOR_LOAD_IN_PROGRESS

...


  // - INVALID_REQUEST

...


  // - INVALID_GROUP_ID

...


  // - GROUP_ID_NOT_FOUND

...


  // - UNKNOWN_MEMBER_ID

...


  // - FENCED_MEMBER_EPOCH 

...


  // - INVALID_ASSIGNMENT  

...


  "fields": [

...


    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",

...


      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },

...


    { "name": "ErrorCode", "type": "int16", "versions": "0+",

...


      "about": "The top-level error code, or 0 if there was no error" },

...


    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",

...


      "about": "The top-level error message, or null if there was no error." }

...


  ]

...


}

Response Handling

If the response contains no error, the member is done.

...

ConnectWorkerGroupMetadataKey

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupMetadataKey",

...


    "validVersions": "3",

...


    "flexibleVersions": "none",

...


    "fields": [

...


          { "name": "GroupId", "type": "string", "versions": "3" }

...


    ]

...


}


ConnectWorkerGroupMetadataValue

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupMetadataValue",

...


    "validVersions": "0",

...


    "flexibleVersions": "0+",

...


    "fields": [

...


        { "name": "Epoch", "versions": "0+", "type": "int32" }

...


    ], 

...


}


ConnectWorkerGroupConnectorsTasksMetadataKey

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupConnectorsTasksMetadataKey",

...


    "validVersions": "4",

...


    "flexibleVersions": "none",

...


    "fields": [

...


          { "name": "GroupId", "type": "string", "versions": "4" }

...


    ]

...


}


ConnectWorkerGroupConnectorsTasksMetadataValue

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupConnectorsTasksMetadataValue",

...


    "validVersions": "0",

...


    "flexibleVersions": "0+",

...


    "fields": [

...


        { "name": "Epoch", "versions": "0+", "type": "int32" },

...


        { "name": "ConnectorsTasks", "versions": "0+",

...


          "type": "[]ConnectorsTasks", "fields": [

...


            { "name": "Connectors", "versions": "0+", "type": "[]String" },

...


            { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }

...


          ]}

...


    ], 

...


}


ConnectWorkerGroupMemberMetadataKey

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupMemberMetadataKey",

...


    "validVersions": "5",

...


    "flexibleVersions": "none",

...


    "fields": [

...


        { "name": "GroupId", "type": "string", "versions": "5" },

...


        { "name": "MemberId", "type": "string", "versions": "5" }

...


    ]

...


}


ConnectWorkerGroupMemberMetadataValue

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupMemberMetadataValue",

...


    "validVersions": "0",

...


    "flexibleVersions": "0+",

...


    "fields": [

...


        { "name": "GroupEpoch", "versions": "0+", "type": "int32" },

...


        { "name": "InstanceId", "versions": "0+", "type": "string" },

...


        { "name": "ClientId", "versions": "0+", "type": "string" },

...


        { "name": "ClientHost", "versions": "0+", "type": "string" },

...


        { "name": "SubscribedTopicNames", "versions": "0+", "type": "[]string" },

...


        { "name": "SubscribedTopicRegex", "versions": "0+", "type": "string" },

...


        { "name": "Assignors", "versions": "0+",

...


          "type": "[]Assignor", "fields": [

...


            { "name": "Name", "versions": "0+", "type": "string" },

...


            { "name": "MinimumVersion", "versions": "0+", "type": "int16" },

...


            { "name": "MaximumVersion", "versions": "0+", "type": "int16" },

...


            { "name": "Reason", "versions": "0+", "type": "int8" },

...


            { "name": "Version", "versions": "0+", "type": "int16" },

...


            { "name": "Metadata", "versions": "0+", "type": "bytes" }

...


          ]}

...


    ], 

...


}


Target Assignment

The target assignment is stored in a single record.

ConnectWorkerGroupTargetAssignmentKey


Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupTargetAssignmentKey",

...


    "validVersions": "6",

...


    "flexibleVersions": "none",

...


    "fields": [

...


          { "name": "GroupId", "type": "string", "versions": "5" }

...


    ]

...


}


ConnectWorkerGroupTargetAssignmentValue

Code Block
{

...


    "type": "data",

...


    "name": "GroupTargetAssignmentValue",

...


    "validVersions": "0",

...


    "flexibleVersions": "0+",

...


    "fields": [

...


        { "name": "AssignmentEpoch", "versions": "0+", "type": "int32" },

...


        { "name": "Members", "versions": "0+", "type": "[]Member", "fields": [

...


            { "name": "MemberId", "versions": "0+", "type": "string" },

...


            { "name": "Error", "versions": "0+", "type": "int8" },

...


            { "name": "ConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "fields": 

...


            [

...


              { "name": "Connectors", "versions": "0+", "type": "[]String" },

...


              { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }

...


            ]

...


        },

...


         { "name": "Version", "versions": "0+", "type": "int16" },

...


            { "name": "Metadata", "versions": "0+", "type": "bytes" }

...


        ]

...


    ]

...


}


Current Member Assignment

...

ConnectWorkerGroupCurrentMemberAssignmentKey

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupCurrentMemberAssignmentKey",

...


    "validVersions": "7",

...


    "flexibleVersions": "none",

...


    "fields": [

...


          { "name": "GroupId", "type": "string", "versions": "7" },

...


          { "name": "MemberId", "type": "string", "versions": "7" },

...


    ]

...


}


ConnectWorkerGroupCurrentMemberAssignmentValue

Code Block
{

...


    "type": "data",

...


    "name": "GroupCurrentMemberAssignmentValue",

...


    "validVersions": "0",

...


    "flexibleVersions": "0+",

...


    "fields": [

...


        { "name": "MemberEpoch", "versions": "0+", "type": "int32" },

...


        { "name": "Error", "versions": "0+", "type": "int8" },

...


        { "name": "ConnectorsTasks", "versions": "0+", "type": "[]ConnectorsTasks", "fields": 

...


            [

...


              { "name": "Connectors", "versions": "0+", "type": "[]String" },

...


              { "name": "Tasks", "versions": "0+", "type": "[]ConnectorsTasks" }

...


            ]

...


        },

...


        { "name": "Version", "versions": "0+", "type": "int16" },

...


        { "name": "Metadata", "versions": "0+", "type": "bytes" }

...


    ], 

...


}


Group Configurations

ConnectWorkerGroupConfigurationKey


Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupConfigurationKey",

...


    "validVersions": "8",

...


    "flexibleVersions": "none",

...


    "fields": [

...


       { "name": "GroupId", "type": "string", "versions": "8" }

...


    ]

...


}


ConnectWorkerGroupConfigurationValue

Code Block
{

...


    "type": "data",

...


    "name": "ConnectWorkerGroupConfigurationValue",

...


    "validVersions": "0",

...


    "flexibleVersions": "0+",

...


    "fields": [

...


        { "name": "Configurations", "versions": "0+", "type": "[]Configuration",

...


          "fields": [

...


             { "name": "Name", "type": "string", "versions": "0+",

...


               "about": "The name of the configuration key." },

...


             { "name": "Value", "type": "string", "versions": "0+",

...


               "about": "The value of the configuration." }

...


        ]}

...


    

...


}


Broker Metrics

We can add them later on.

...

As already discussed, we would be adding a new interface called Assignor. It will have 2 extensions namely PartitionAssignor and ConnectAssignor.

Assignor

Code Block
languagejava
1
package org.apache.kafka.clients.consumer;
2 3


import org.apache.kafka.common.TopicIdPartition;
4

import org.apache.kafka.common.Uuid;
5 6


import java.nio.ByteBuffer;
7

import java.util.List;
8

import java.util.Optional;
9 10


public interface Assignor
{11 12classGroup{13/** 14 * The members. 15 */16List<GroupMember> members;17}18 19classGroupMember{20/** 21 * The member ID. 22 */23String memberId;24 25/** 26 * The instance ID if provided. 27 */28Optional<String> instanceId;29 30/** 31 * The reason reported by the member. 32 */33byte reason;34 35/** 36 * The version of the metadata encoded in {{@linkGroupMember#metadata()}}. 37 */38int version;39 40/** 41 * The custom metadata provided by the member as defined 42 * by {{@linkPartitionAssignor#metadata()}}. 43 */44ByteBuffer metadata;45 46}47 48classAssignment{49/** 50 * The assignment error. 51 */52byte error;53 54/** 55 * The member assignment. 56 */57List<MemberAssignment> members;58}59 60classMemberAssignment{61/** 62 * The member ID. 63 */64String memberId;65 66/** 67 * The error reported by the assignor. 68 */69byte error;70 71/** 72 * The version of the metadata encoded in {{@linkGroupMember#metadata()}}. 73 */74int version;75 76/** 77 * The custom metadata provided by the assignor. 78 */79ByteBuffer metadata;80}81 82classMetadata{83/** 84 * The reason reported by the assignor. 85 */86byte reason;87 88/** 89 * The version of the metadata encoded in {{@linkMetadata#metadata()}}. 90 */91int version;92 93/** 94 * The custom metadata provided by the assignor. 95 */96ByteBuffer metadata;97}98 99/** 100 * Unique name for this assignor. 101 */102Stringname();103 104/** 105 * The minimum version. 106 */107intminimumVersion();108 109/** 110 * The maximum version. 111 */112intmaximumVersion();113 114/** 115 * Return serialized data that will be sent to the assignor. 116 */117Metadatametadata();118 119/** 120 * Perform the group assignment given the current members and 121 * topic metadata. 122 * 123 * @paramgroup The group state. 124 * @return The new assignment for the group. 125 */126Assignmentassign(Group group);127 128/** 129 * Callback which is invoked when the member received a new 130 * assignment from the assignor/group coordinator. 131 */132voidonAssignment(MemberAssignment assignment);133 134}135

PartitionAssignor

...

 {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The reason reported by the member.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

    }

    class Assignment {
        /**
         * The assignment error.
         */
        byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;
        
        /**
         * The error reported by the assignor.
         */
        byte error;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    class Metadata {
        /**
         * The reason reported by the assignor.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link Metadata#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);

}


PartitionAssignor

Code Block
languagejava
package org.apache.kafka.clients.consumer;

...



import org.apache.kafka.common.TopicIdPartition;

...


import org.apache.kafka.common.Uuid;

...



import java.util.List;

...



public interface PartitionAssignor extends Assignor

...

 {

    class Group extends Assignor.Group

...

 {
        /**

...


         * The topics' metadata.

...


         */
        List<TopicMetadata> topics;

    }

    class GroupMember extends Assignor.GroupMember {

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;

    }

    class TopicMetadata {
        /**
         * The topic ID.
         */
        Uuid topicId;

        /**
         * The number of partitions.
         */
        int numPartitions;
    }

    class MemberAssignment extends Assignor.MemberAssignment {

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

    }

}

ConnectAssignor

Code Block
languagejava
package 

...

ConnectAssignor

...

org.apache.kafka.connect.runtime;

...



import org.apache.kafka.clients.consumer.Assignor;

...


import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;

...


import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;

...


import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;

...



import java.util.List;

...



public interface ConnectAssignor extends Assignor

...

 {

    class Group extends Assignor.Group

...

 {

        /**

...


         * Connector's and tasks metadata.

...


         */
        WorkerCoordinator.ConnectorsAndTasks connectorsAndTasks;

...


    }

    class GroupMember extends Assignor.GroupMember

...

 {

        /**

...

Broker API

Assignor

...


         * The worker state signifying the assigned connectors and
         * tasks.
         * Note
         */
        ExtendedWorkerState workerState;

    }

    class MemberAssignment extends Assignor.MemberAssignment {

        /**
         * The worker state signifying the assigned connectors and
         * tasks.
         */
        ExtendedAssignment assignment;


    }

}


Broker API

Assignor

Code Block
languagejava
package org.apache.kafka.clients.consumer;

...



import org.apache.kafka.common.TopicIdPartition;

...


import org.apache.kafka.common.Uuid;

...



import java.nio.ByteBuffer;

...


import java.util.List;

...


import java.util.Optional;

...



public interface Assignor

...

 {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The reason reported by the member.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

    }

    class Assignment {
        /**
         * The assignment error.
         */
        byte error;

        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;
        
        /**
         * The error reported by the assignor.
         */
        byte error;

        /**
         * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    class Metadata {
        /**
         * The reason reported by the assignor.
         */
        byte reason;

        /**
         * The version of the metadata encoded in {{@link Metadata#metadata()}}.
         */
        int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);

}

PartitionAssignor

Code Block
languagejava
package 

...

PartitionAssignor

...

org.apache.kafka.clients.consumer;

...



import org.apache.kafka.common.TopicIdPartition;

...


import org.apache.kafka.common.Uuid;

...



import java.util.List;

...



public interface PartitionAssignor extends Assignor

...

 {

    class Group extends Assignor.Group

...

 {
        /**

...


         * The topics' metadata.

...


         */
        List<TopicMetadata> topics;

    }

    class GroupMember extends Assignor.GroupMember {

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;

    }

    class TopicMetadata {
        /**
         * The topic ID.
         */
        Uuid topicId;

        /**
         * The number of partitions.
         */
        int numPartitions;
    }

    class MemberAssignment extends Assignor.MemberAssignment {

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

    }

}

Worker and Assignment Metadata

...

Let’s take a look at few example scenarios to understand how the rebalancing would work in the new protocol. I am taking a few illustrations from Image RemovedKIP-415: Incremental Cooperative Rebalancing in Kafka Connect - Apache Kafka - Apache Software Foundation to  to keep it familiar. Similar to the KIP, first letter of the recourse is a Connector instance (e.g. Connector A, Connector B, etc). Second letter is type: C for Connector, T for task. Number is regular task numbering. 0 for Connectors, greater or equal to 1 for Tasks. W represents a Worker, with W1 and W2, etc being different Workers joining the same group. Primes are used to represent a Worker that was member of the group and rejoins soon after a short period of being offline. 

...