Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Now the new process for the same new-member-joining example becomes the following under the new protocol (note that on the existing member we will now only call revoke on b-0 once, and no longer revoking both a-0 and b-0 and then later assigning a-0 again:

Rebalance Callback Error Handling

Today, when a user customized rebalance listener callback throws an exception, as long as it is not WakeupException / InteruptException it will be swallowed and logged with an error. This has been complained by our users that it was not an efficient way for notifying them when it happened (

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4600
).

In this KIP, we propose to change this behavior as well: when the listener callback throws an exception, ConsumerCoordinator will log an error, and then rethrows this exception all the way up to KafkaConsumer.poll(); at the same time, the consumer coordinator's book-kept owned partitions will exclude this partition from its intended behavior, either for revocation / assignment / lost.

Upon capturing the error, users can do the following depend on their exception handling logic:

  • Shutdown the consumer gracefully if the exception is considered a fatal error.
  • Retry consumer.poll() if they believe the exception is re-triable, be aware that it will be treated as if no side-effects are taken at all from the exception-thrown callback: e.g. if your rebalance listener did some side-effect and then throws, it will still be considered as no-op if consumer.poll() is retried.


To give a concrete example, suppose your current assigned partitions are {1,2}, and the newly assigned partitions are {2,3}, the consumer will call onPartitionsAssigned(3) and onPartitionsRevoked(1). Suppose the former succeeds but the latter failed with an error and user captured it in consumer.poll and retry, we just let the consumer to proceed as assign with {1, 2, 3} since the 3 is added successfully but 1 is revoked unsuccessfully – i.e. we consider none of the effects take place in the exception-thrown onPartitionsRevoked(1).


Consumer StickyAssignor

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data. Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We can bump up their versions while simplifying the user-data and leverage on the Subscription#ownedPartitions instead (details about the upgrade compatibility below).

...

Code Block
languagejava
public enum RebalanceStrategy {
    EAGER((byte) 0), COOPERATIVE((byte) 1);

    private final byte id;

    RebalanceStrategy(byte id) {
        this.id = id;
    }

    public byte id() {
        return id;
    }

    public static RebalanceStrategy forId(byte id) {
        switch (id) {
            case 0:
                return EAGER;
            case 1:
                return COOPERATIVE;
            default:
                throw new IllegalArgumentException("Unknown strategy id: " + id);
        }
    }
}

class PartitionAssignorProtocol {
    String name;                // same semantics as the deprecated PartitionAssingor#name();

    RebalanceStrategy supportedStrategy;
}

interface PartitionAssignor {

    // existing interfaces

    short version();             // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    @Deprecated;                 // use supportedProtocolPartitionAssignorProtocol#name instead 
    String name();

    List<PartitionAssignorProtocol> supportedProtocols();    // new API, indicate the ConsumerCoordinator which rebalance strategy it would work with;
                                                             // and associate the protocol with a unique name of the assignor.

    class Subscription {
        public Short version();  // new API, and current version == 2

        public List<String> topics();

        public List<TopicPartition> ownedPartitions();  // new API, if version < 2 should always be empty

        public ByteBuffer userData();
    }

    class Assignment {
        public Short version();  // new API, same as above

        public List<TopicPartition> partitions();

        public ConsumerProtocol.Errors error();   // new API, if version < 2 should always be NONE

        public ByteBuffer userData();
    }
}

...

Code Block
languagejava
"rebalance.protocolstrategy":


type: Enum 
values: {eager, compatible, cooperative}
default: eager


When the config value is "eager", the consumer would still execute the old rebalance behavior (i.e. revoke everything before joining the group); if the config value is "cooperative", the consumer will then use the new protocol. However, it needs to make sure that the instantiated assignor can work with whatever the protocol it chose by calling PartitionAssignor#supportedProtocols and try to use the corresponding name (if it is supported) of the selected protocol. When the config value is "compatible", the the logic would be a bit complicated: 1) it will still use "eager" consumer coordinator behavior, but 2) it will try to encode two assignor protocols in the subscription if the assignor supports it (i.e. its supportedProtocols have both).

The user's upgrade behavior to leverage this cooperative rebalance from 2.2 and older to newer versions would be (assuming it was "range" before the upgrade):

...

  • Having a rolling bounce to replace the byte code (i.e. swap the jars); set the rebalance.protocol config to "compatible" and also set the assignors to "sticky".
  • Having another rolling bounce to replace the rebalance.protocol to "cooperative".
  • The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).

    ...