Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo

...

And on assignment: 

Code Block
KafkaConsumer:


Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes

------------------

StreamsPartitionAssignor:

AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32

...

We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier. In the first rebalance, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:

  1. Join group with all current active tasks running.

  2. After first rebalance, sync the revoked partitions and stop them.

  3. Rejoin group immediately with only active tasks to trigger a second rebalance.

More specifically, we will push the following metadata bytes to the consumer layer, which will be universally applicable to any consumers:

Consumer Protocol

More specifically, we would first inject more metadata at the consumer-layer, as:

Code Block
KafkaConsumer:


Subscription => TopicList AssignedPartitions SubscriptionInfo
   TopicList
Code Block
SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32List<String>
   LatestSupportVersionIdAssignedPartitions  => Int32
   ClientUUID=> List<TopicPartition>   // can be null
   SubscriptionInfo        => 128bit
Bytes   PrevTasks


Assignment = AssignedPartitions RevokedPartitions AssignmentInfo
   AssignedPartitions        => Set<TaskId>List<TopicPartition>
   StandbyTasksRevokedPartitions            => Set<TaskId>
List<TopicPartition>   EndPoint// can be null
   AssignmentInfo          => HostInfo

And on assignment: 

Bytes


Note that it is compatible to inject additional fields before the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes before, and vice-versa on deserialization. So as long as we always keep the metadata bytes at the end of the protocol it is naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.


Consumer Coordinator Algorithm

Rebalance behavior of the consumer (captured in the consumer coordinator layer) would be changed as follows.

  1. For every consumer: before sending the join-group request, do NOT revoke any partitions; instead just encode the current assigned partitions as part of the Subscription.
  2. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
    2. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    3. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions.
  3. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
      Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions.
    3. For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    4. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.


No changes required from the broker side, since this logic change is completely wrapped inside the consumer protocol / coordinator implementation itself, and to brokers it is just the same as previous version's rebalances.

Note that one minor difference compared with KIP-415 is that we do not introduce the scheduledDelay in the protocol, but instead the consumer will trigger rebalance immediately. This is because the consumer protocol would applies to all consumers (including streams) and hence should be kept simple, and also because KIP-345 is being developed in parallel which is aimed for tackling the scaling out / rolling bounce scenarios already.

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.

Version Compatibility

Since we are modifying the consumer protocol The assignment metadata bytes (version 4) are:



For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new members, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance upon readiness. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks.

...