Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The key idea to resolve this, is to let Assignor implementors themselves to indicate the ConsumerCoordinator whether they are compatible with the protocol, and then relying ton multi-assignor protocols for a safe upgrade path: users need to keep two assignors when switching the rebalance protocol, and after that they can use another rolling bounce to remove the old versioned protocol.


More specifically, the PartitionAssignor we will introduce the new public API ConsumerPartitionAssignor class and its #Subscription / #Assignment subclasses will be augmented (the old classes are actually in `internal` package mistakenly, so we use this KIP to deprecate that class with this new one, along with augmented methods) as follows:

Code Block
languagejava
public enum RebalanceProtocol {
    EAGER((byte) 0), COOPERATIVE((byte) 1);

    private final byte id;

    RebalanceProtocol(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RebalanceProtocol forId(byte id) {
        switch (id) {
            case 0:
                return EAGER;
            case 1:
                return COOPERATIVE;
            default: interface ConsumerPartitionAssignor {

    /**
     * Return serialized data that will be included in the {@link Subscription} sent to the leader
     * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
     *
     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
     *               and variants
     * @return nullable subscription user data
     */
    default ByteBuffer subscriptionUserData(Set<String> topics) {
        return null;
    }

    /**
     * Perform the group assignment given the member subscriptions and current cluster metadata.
     * @param metadata Current topic/broker metadata known by consumer
     * @param groupSubscription Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
     * @return A map from the members to their respective assignments. This should have one entry
     *         for each member in the input subscription map.
     */
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);

    /**
     * Callback which is invoked when a group member receives its assignment from the leader.
     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
     * @param metadata Additional metadata on the consumer (optional)
     */
    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
    }

    /**
     * Indicate which rebalance protocol this assignor works with;
     * By default it should always work with {@link RebalanceProtocol#EAGER}.
     */
    default List<RebalanceProtocol> supportedProtocols() {
        return Collections.singletonList(RebalanceProtocol.EAGER);
    }

    /**
     * Return the version of the assignor which indicates how the user metadata encodings
     * and the assignment algorithm gets evolved.
     */
    default short version() {
        return (short) 0;
    }

    /**
     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
     * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
     * @return non-null unique name
     */
    String name();

    final class Subscription {
        public List<String> topics();

        public ByteBuffer userData();

        public List<TopicPartition> ownedPartitions();

        public void setGroupInstanceId(Optional<String> groupInstanceId);

        public Optional<String> groupInstanceId();
    }

    final class Assignment {
        public List<TopicPartition> partitions();

        public ByteBuffer userData();
    }

    final class GroupSubscription {
        public GroupSubscription(Map<String, Subscription> subscriptions);

        public Map<String, Subscription> groupSubscription();
    }

    final class GroupAssignment {
        public GroupAssignment(Map<String, Assignment> assignments);

        public Map<String, Assignment> groupAssignment();
    }

    /**
     * The rebalance protocol defines partition assignment and revocation semantics. The purpose is to establish a
     * consistent set of rules that all consumers in a group follow in order to transfer ownership of a partition.
     * {@link ConsumerPartitionAssignor} implementors can claim supporting one or more rebalance protocols via the
     * {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their responsibility to respect the rules
     * of those protocols in their {@link ConsumerPartitionAssignor#assign(Cluster, GroupSubscription)} implementations.
     * Failures to follow the rules of the supported protocols would lead to runtime error or undefined behavior.
     *
     * The {@link RebalanceProtocol#EAGER} rebalance protocol requires a consumer to always revoke all its owned
     * partitions before participating in a rebalance event. It therefore allows a complete reshuffling of the assignment.
     *
     * {@link RebalanceProtocol#COOPERATIVE} rebalance protocol allows a consumer to retain its currently owned
     * partitions before participating in a rebalance event. The assignor should not reassign any owned partitions
     * immediately, but instead may indicate consumers the need for partition revocation so that the revoked
     * partitions can be reassigned to other consumers in the next rebalance event. This is designed for sticky assignment
     * logic which attempts to minimize partition reassignment with cooperative adjustments.
     */
    enum RebalanceProtocol {
        EAGER((byte) 0), COOPERATIVE((byte) 1);

     throw new IllegalArgumentException("Unknown rebalanceprivate protocol id: " +final byte id);

        }
    }
}

interface PartitionAssignor {

RebalanceProtocol(byte id) {
        // existing interfaces

  this.id = short version()id;
        }

        public byte id() {
            return id;
        }

 // new API, the version of the assignorpublic whichstatic indicateRebalanceProtocol the user metadata / algorithmic difference

forId(byte id) {
       String name();

    List<RebalanceProtocol>switch supportedProtocols(id); {
            // new API, indicate whichcase rebalance0:
 strategy it would work with

    class Subscription {
        public List<String>return topics()EAGER;

        public List<TopicPartition> ownedPartitions();      case 1:
 // new API, on older version 1 should always be empty

        public ByteBufferreturn userData()COOPERATIVE;
    }

     class Assignment {
     default:
   public List<TopicPartition> partitions();

        public ConsumerProtocol.Errors error();     throw new IllegalArgumentException("Unknown rebalance protocol id: "  // new API, on older version 1 should always be NONE

  + id);
            }
      public ByteBuffer userData();}
    }
}


Note the semantical difference between the above added fields:

...


From the user's perspective, the upgrade path of leveraging new protocols is just the same as switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config. The upgrade path would be:

...

  • Having a first rolling bounce to replace the byte code (i.e. swap the jars); set the assignors to "range, cooperative-sticky". At this stage, the new versioned byte code will still choose EAGER as the protocol and then sends both assignors in their join-group request, since there are at least one member who's not bounced yet and therefor will only send with "range", "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  • Having a second rolling bounce to remove the "range" assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever have been bounced will then choose COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.
  • The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

    ...