Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejs
linenumberstrue
{
    "type": "data",
    "name": "ConsumerGroupCurrentMemberAssignmentValue",
    "validVersions": "0",
    "flexibleVersions": "0+",
    "fields": [
        { "name": "Epoch", "versions": "0+", "type": "int32" },
        { "name": "TopicPartitions", "versions": "0+",
          "type": "[]TopicPartition", "fields": [
            { "name": "TopicId", "versions": "0+", "type": "uuid" },
            { "name": "Partitions", "versions": "0+", "type": "[]int32" }
        ]},
        { "name": "MetadataError", "versions": "0+", "type": "int8" },
        { "name": "MetadataVersion", "versions": "0+", "type": "int32" },
        { "name": "MetadataBytes", "versions": "0+", "type": "bytes" }
    ], 
}

Broker API

Broker side assignor API - Basically the same as the consumer but without metadata.

TODO

Broker Metrics

  • Group count by type
  • Group count by state
  • Rebalance Rate

Broker Configurations

New properties in the broker configuration.

...

The new PartitionAssignor interface will be introduced on the server side. Two implementations will be provided out of the box: RangeAssignor (range) and UniformAssignor (uniform).

Code Block
languagejava
linenumberstrue
package org.apache.kafka.server.group.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * The mapping from topic ID to number of partitions
         * as provided by the group coordinator
         */
        Map<Uuid, Integer> topicMetadata;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

    class Assignment {
        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment

Group Configurations

New dynamic group properties.

...

Consumer API

New PartitionAssignor interface

The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The membersmember ID.
         */
        List<GroupMember>        String membersmemberId;

        /**
         * The mapping from topic ID to number of partitions
         * as provided by the group coordinator
         */
        Map<Uuid, Integer> topicMetadataassigned partitions.
         */
        List<TopicIdPartition> partitions;
    }

    class GroupMember {
            /**
              * TheUnique member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        name for this assignor.
     */
    String name();

    /**
              * ThePerform setthe ofgroup topicassignment IDsgiven that the membercurrent members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);
}

Broker Metrics

  • Group count by type
  • Group count by state
  • Rebalance Rate

Broker Configurations

New properties in the broker configuration.

NameTypeDefaultDoc
group.consumer.session.timeout.msint30sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.min.session.timeout.msint45sThe minimum session timeout.
group.consumer.max.session.timeout.msint60sThe maximum session timeout.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.
group.consumer.min.heartbeat.interval.msint5sThe minimum heartbeat interval.
group.consumer.max.heartbeat.interval.msint15sThe maximum heartbeat interval.
group.consumer.max.sizeintMaxValueThe maximum number of consumers that a single consumer group can accommodate.
group.consumer.assignorsListrange, uniformThe server side assignors.

Group Configurations

New dynamic group properties.

NameTypeDefaultDoc
group.consumer.session.timeout.msint30sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.

Consumer API

New PartitionAssignor interface

The new PartitionAssignor interface will be introduced to replace the ConsumerPartitionAssignor interface. The interface is defined as follow:

Code Block
languagejava
linenumberstrue
package org.apache.kafka.clients.consumer;

public interface PartitionAssignor {

    class Group {
        /**
         * The members.
         */
        List<GroupMember> members;

        /**
         * The mapping from topic ID to number of partitions
         * as provided by the group coordinator
         */
        Map<Uuid, Integer> topicMetadata;
    }

    class GroupMember {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The instance ID if provided.
         */
        Optional<String> instanceId;

        /**
         * The set of topic IDs that the member is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

    class Assignment {
        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 is subscribed to.
         */
        List<Uuid> subscribedTopicIds;

   		/**
		 * The reason reported by the member.
		 */
		byte reason;  

		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the member as defined
         * by {{@link PartitionAssignor#metadata()}}.
         */
        ByteBuffer metadata;

        /**
         * The partitions owned by the member at the current epoch.
         */
        List<TopicIdPartition> ownedPartitions;
    }

    class Assignment {
        /**
         * The member assignment.
         */
        List<MemberAssignment> members;
    }

    class MemberAssignment {
        /**
         * The member ID.
         */
        String memberId;

        /**
         * The assigned partitions.
         */
        List<TopicIdPartition> partitions;

  		/**
		 * The error reported by the assignor.
		 */
		byte error; 

 		/**
		 * The version of the metadata encoded in {{@link GroupMember#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata;

    }

	class Metadata {
   		/**
		 * The reason reported by the assignor.
		 */
		byte reason; 

 		/**
		 * The version of the metadata encoded in {{@link Metadata#metadata()}}.
		 */
		int version;

        /**
         * The custom metadata provided by the assignor.
         */
        ByteBuffer metadata; 
    }

    /**
     * Unique name for this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version this assignor.
     */
    String name();

    /**
     * The minimum version.
     */
    int minimumVersion();

    /**
     * The maximum version.
     */
    int maximumVersion();

    /**
     * Return serialized data that will be sent to the assignor.
     */
    Metadata metadata();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    int    Assignment maximumVersionassign(Group group);

    /**
     * Return serialized data that will be sent to Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    Metadata    void metadataonAssignment();

    /**
     * Perform the group assignment given the current members and
     * topic metadata.
     *
     * @param group The group state.
     * @return The new assignment for the group.
     */
    Assignment assign(Group group);

    /**
     * Callback which is invoked when the member received a new
     * assignment from the assignor/group coordinator.
     */
    void onAssignment(MemberAssignment assignment);
}

Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)

The enforceRebalance methods are no longer necessary and will be deprecated in a future release.

Deprecate ConsumerPartitionAssignor interface.

The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.

Consumer Configurations

...

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

...

MemberAssignment assignment);
}

Deprecate Consumer#enforceRebalance and Consumer#enforceRebalance(String)

The enforceRebalance methods are no longer necessary and will be deprecated in a future release.

Deprecate ConsumerPartitionAssignor interface.

The ConsumerPartitionAssignor interface will be deprecated in a future (major) release.

Consumer Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

group.remote.assignorstringuniformThe server side assignor to use. It cannot be used in conjunction with group.local.assignor.
group.local.assignorslistemptyThe list of client side (local) assignors. It cannot be used in conjunction with group.remote.assignor.

Streams Metadata

TODO

name

type

note

processId

uuid / static

Inherited. In the future we may also remove this field when one instance only use one consumer

userEndPoint

bytes / static

Inherited

clientTags

map / static

Inherited

errorCode

int8 / dynamic

Inherited and enhanced. Communicate rebalance reasons, possible values:

  • None
  • Shutdown
  • Warm-up ready
  • Warm-up failed
  • Requested by coordinator
  • Topology changed
  • [more?]

topologyHash

uuid / dynamic

Only updatable when errorCode is not “none”.

taskLag

array / dynamic

Only updatable when errorCode is not “none”.


name

type

note

activeTasks

list

Inherited

standbyTasks

map

Modified, only contain normal standby tasks

warmupTasks

map

New, warm-up standby tasks

partitionsByHost

map

Merged, global assignment information used for IQ

errorCode

int8

Inherited, possible values:

  • None
  • Shutdown
  • AssignmentError
  • InconsistentTopology
  • [more?]

Streams Metadata

...

Streams Configurations

NameTypeDefaultDoc
group.protocolenumgeneric

A flag which indicates if the new protocol should be used or not. It could be: generic or consumer

Admin API

Admin#listConsumerGroups

...