Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

And on assignment: 

Code Block
KafkaConsumer:


Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes

------------------

StreamsPartitionAssignor:

AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32

...

More specifically, we would first inject more metadata at the consumer-layer, as:

Code Block
KafkaConsumer:


Subscription => TopicList AssignedPartitions SubscriptionInfo
   TopicList               => List<String>
   AssignedPartitions      => List<TopicPartition>   // can be null
   SubscriptionInfo        => Bytes   


Assignment = AssignedPartitions RevokedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   RevokedPartitions       => List<TopicPartition>   // can be null
   ErrorCode               => Int16
   AssignmentInfo          => Bytes


Note that it is compatible to inject additional fields before the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes before, and vice-versa on deserialization. So as long as we always keep the metadata bytes at the end of the protocol it is naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.

...

  1. For every consumer: before sending the join-group request, do NOT revoke any partitions; instead just encode the current assigned partitions as part of the Subscription.
  2. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
    2. Check the error code as well, and depending on it move forward to c) below, or trigger a rebalance immediately (for incompatible members, see below), or fail immediately if it is fatal.
    3. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    4. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions.
  3. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
      Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions.
    3. For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    4. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.

...

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.

...

Compatibility and Upgrade Path

Since we are modifying the consumer protocol protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner.

Note that since we are injecting additional fields in the middle of the consumer protocol to keep the assignor-field "info" at the end, the new protocol would not be compatible with the old version. That means, an old-versioned consumer would not be able to deserialize a newer-versioned protocol data at all. Therefore when upgrading we need the new consumer byte-code to first still following the old versioned protocol for both metadata encoding, as well as the behavior (e.g. still revoking before send JoinGroup). And after everyone have upgraded to the new byte-code, we can allow them to start rebalancing with the new versioned protocol. Note that during the later rebalance, it is still possible that consumers will send join-group request with old version (but the key here is that they are all new-version aware), in which case consumer leader can freely adjust its logic based on the aggregated versions. More specifically, we introduce the following new config to Consumer:



Code Block
languagejava
titleProtocol Type
"rebalance.protocol":


type: Enum 
values: {eager, cooperative}
default: eager


When the config value is "eager", the consumer would still use V0 of the consumer protocol as well as the rebalance behavior; if the config value is "cooperative", the consumer will then use the new V1 protocol as well as the new algorithm. Note the difference with KIP-415 here, that since on consumer we cannot leverage on list of assignors to register multiple protocols and let leader to auto-switch to new versions, we need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).


And we update the above algorithm on leader (i.e. bullet point 3) as, such that we will first check the versions of subscription of all the members:

  1. If all members are on V1, then follow the new algorithm.
  2. If all members are on V0, then follow the old algorithm.
  3. If there's no consensus, it means we are in a second rebalance then do the following:
    1. For those members in V1, send assignment in V1 as well by just giving back their existing assigned-partitions from the subscription metadata and leaving the revoked-partitions empty.
    2. For those members in V0, which means they have revoked their partitions but we do not know what are those partitions, we can only given them V0 assignment back with an empty assigned-partitions. 


So under case 3), those members in V0 will have a small window where no partitions are assigned, and some partitions are not assigned to anyone as well. Hence, we will add a new ConsumerCoordinatorMetrics, which client will record upon receiving the assignment with the error code

Code Block
languagejava
titleProtocol Type
"num.incompatible.rebalance": "total number of rebalances that have failed due to incompatible members joining group at the same time."


group: "consumer-coordinator-metrics"
tags: client-id=([-.\w]+)
type: Count

The client, upon receiving the assignment, would record it in a new


    1. metric called "incompatible group members"



As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.


Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever where some partitions would not be assigned to anyone. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause consumers to crash badly due to deserialization failures. There's a few edge cases worth mentioning here:

Downgrading and Old-Versions New Member

If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2. At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above. We will add a consumer side metric after getting propagated back about this situation so that users would be notified in time.


-------------------------------


For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new members, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance upon readiness. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks.

...