Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state: "Under DiscussionAdopted"

Discussion thread: here

Vote thread: here

JIRA: here

Motivation

In KIP-429: Kafka Consumer Incremental Rebalance Protocol, we proposed a new version of consumer protocol for cooperative rebalance (a.k.a incremental rebalance protocol). It adds a new field "ownedPartition" into the "subscription" and "assignment" data, and one of the purpose of the new "ownedPartition" field is for the assignors and to do "sticky" assignment dependent on the "ownedPartitions". Like in the "CooperativeStickyAssignor and custom COOPERATIVE Assignors" section of KIP-429, it said:

...

However, recently, we've encountered some rebalance stuck issues, and the root cause of them are due to the out-of-date "ownedPartition". Because there are chances that the "ownedPartitions" are out-of-date, the assignors will blindly "trust" the "ownedPartitions", and do assignment depend on them, and cause unexpected results. ex: KAFKA-12984, KAFKA-13406.


Currently, we tried to workaround this issue by adding "generation" field into subscription "userData" field in cooperative sticky assignor, and deserialize them when doing assignment, to identify if the "ownedPartitions" are out-of-date or not. However, this workaround only works for cooperative sticky assignor, if users have their own custom cooperative assignor, they also need to workaround it manually. Otherwise, the same issues also happen to them. On the other hand,  `StickyAssignor` is also adding "generation" field plus the "ownedPartitions" into subscription userData bytes. the difference is that the `StickyAssignor`'s user bytes also encode the prev-owned partitions while the `CooperativeStickyAssignor` relies on the prev-owned partitions on the subscription protocol directly.


Only appending "ownedPartitions" data without "generation" info in the Subscription message, is like in TCP, only send packets without appending the sequence number. It'll confuse the assignor(or TCP receivers) and make the wrong decision.

...

Code Block
languagejava
final class Subscription {
    private final List<String> topics;
    private final ByteBuffer userData;
    private final List<TopicPartition> ownedPartitions;
    private Optional<String> groupInstanceId;
    private final int generationId  // new added

    public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions, int generationId) {
        this.topics = topics;
        this.userData = userData;
        this.ownedPartitions = ownedPartitions;
        this.groupInstanceId = Optional.empty();
        this.generationId = generationId;  // new added
    }

    public Subscription(List<String> topics, ByteBuffer userData, List<TopicPartition> ownedPartitions) {
        this(topics, userData, Collections.emptyList(), -1);
    }

    public Subscription(List<String> topics, ByteBuffer userData) {
        this(topics, userData, Collections.emptyList(), -1);
    }

    public Subscription(List<String> topics) {
        this(topics, null, Collections.emptyList(), -1);
    }

    // new added, the generationId getter 
    public int generationId() {
        return generationId;
    } 
}


Proposed Changes


So, during the joinGroup request, we'll all consumers will include the "generation" data into the protocol set and send to the group coordinator. Later, when the consumer lead receive the subscription info from all consumers, it'll do the assignment based on the "ownedPartitions" and "generation" info. Also, after the assignment, we can also leverage the "ownedPartitions" and "generation" info to validate the assignments.

...

For built-in CooperativeStickyAssignor, if there are consumers in old bytecode and some in the new bytecode, it's totally fine, because the subscription data from old consumers will contain \[empty ownedPartitions + default generation(-1)] in V0, or \[current ownedPartitions + default generation(-1)] in V1. For V0 case, it's quite simple, because we'll just ignore the info since they are empty. For V1 case, we'll get the "ownedPartitions" data, and then check if there is generation decode the "generation" info in the userData (i,e, current workaround). If so, we'll use it, otherwise, we'll take the ownedPartitions as subscription userData bytes. So that we can continue to do assignment with these information.


For built-in StickyAssignor, if there are consumers in old bytecode and some in the new bytecode, it's also fine, because the subscription data from old consumers will contain \[empty ownedPartitions + default generation(-1)] in V0, or \[current ownedPartitions + default generation(-1)] in V1. For both V0 and V1 case, we'll directly use the ownedPartition and generation info in the subscription userData bytes.


For custom assignor, they can adopt the "ownedPartitions" and "generation" info together to check if it is a stale one. If they don't update their code, it's fine since this change is backward compatible. (check below)

...