You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »


Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here

Motivation

In KIP-429: Kafka Consumer Incremental Rebalance Protocol, we proposed a new version of consumer protocol for cooperative rebalance (a.k.a incremental rebalance protocol). It adds a new field "ownedPartition" into the "subscription" and "assignment" data, and one of the purpose of the new "ownedPartition" field is for the assignors and to do "sticky" assignment dependent on the "ownedPartitions". Like in the "CooperativeStickyAssignor and custom COOPERATIVE Assignors" section of KIP-429, it said:

The assignor can leverage the new ownedPartitions field that the Subscription has been augmented with in order to determine the previous assignment. Note that "stickiness" is important for the cooperative protocol to be effective, as in the limit that the new assignment is totally different than the previous one then the cooperative protocol just reduces to the old eager protocol as each member will have to completely revoke all partitions and get a whole new assignment.


However, recently, we've encountered some rebalance stuck issues, and the root cause of them are due to the out-of-date "ownedPartition". Because there are chances that the "ownedPartitions" are out-of-date, the assignors will blindly "trust" the "ownedPartitions", and do assignment depend on them, and cause unexpected results. ex: KAFKA-12984, KAFKA-13406. Currently, we tried to workaround this issue by adding "generation" field into subscription "userData" field in cooperative sticky assignor, and deserialize them when doing assignment, to identify if the "ownedPartitions" are out-of-date or not. However, this workaround only works for cooperative sticky assignor, if users have their own custom cooperative assignor, they also need to workaround it manually. Otherwise, the same issues also happen to them.


Therefore, we should add the "generation" field into "Subscription" data of the consumer protocol. (i.e. ConsumerProtocolSubscription)

Public Interfaces

Current consumer protocol for Subscription:

KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>

 


Proposed Changes

Updated consumer protocol for Subscription, which will add a new field "generation" at the end.


KafkaConsumer:
 
Subscription => TopicList UserData AssignedPartitions Generation
   TopicList               => List<String>
   UserData                => Bytes  
   OwnedPartitions         => List<String, List<Int32>>
   Generation              => Int32   <--- new field



So, during the joinGroup request, we'll include the "generation" data into the protocol set and send to the group coordinator. Later, when the consumer lead receive the subscription info from all consumers, it'll do the assignment based on the "ownedPartitions" and "generation" info. Also, after the assignment, we can also leverage the "ownedPartitions" and "generation" info to validate the assignments.


For built-in CooperativeStickyAssignor, if there are consumers in old bytecode and some in the new bytecode, it's totally fine, because the subscription data from old consumers will contain \[empty ownedPartitions + default generation(-1)] in V0, or \[current ownedPartitions + default generation(-1)] in V1. For V0 case, it's quite simple, because we'll just ignore the info since they are empty. For V1 case, we'll get the "ownedPartitions" data, and then check if there is generation info in the userData (i,e, current workaround). If so, we'll use it, otherwise, we'll take the ownedPartitions as default generation(-1).

For custom assignor, they can adopt the "ownedPartitions" and "generation" info together to check if it is a stale one. If they don't update their code, it's fine since this change is backward compatible. (check below)

Compatibility, Deprecation, and Migration Plan

It is compatible to inject additional fields after the assignor-specific SubscriptionInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some existing fields before, and some new fields after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. 

Rejected Alternatives

Adding a new generation  method in ConsumerPartitionAssignor  interface, for the assignor to get the generation info.

--> This will work for the assignor only. But actually, in ConsumerCoordinator , after the cooperative assignor completes its assignment, we have a validation phase, to validate if the cooperative assignor revoke partitions first before assign it to other consumers. In the validation phase, we also need the "generation" info. If the generation info only put inside assignor, the validation phase can't leverage the "generation" data.

  • No labels