References:
Rebalancing between distributed application processes in Apache Kafka was enhanced considerably when it was decoupled as logic from Kafka brokers and was moved as responsibility to the clients and specifically to Kafka Consumer. This pattern has been working robustly for quite a while now and has been used successfully, besides Kafka Consumer itself, in Kafka Streams, Kafka Connect, and other applications outside the Apache Kafka code repository. It is based on a simple principle: when a distributed process leaves or joins the group, all the members of this group stop processing and coordinate in order to redistribute the resources they share, such as their assignments to Kafka topic partitions. At the start of rebalancing, the processes will have any assigned resources to them revoked. At the end of rebalancing they will receive a new assignment of resources. At any time, one process in the group is the designated leader. The group leader is responsible for computing the assignment of resources within the group during a rebalance.
In this proposal, we introduce necessary improvements to the existing rebalancing procedure and propose extensions to the client side protocols - also known as embedded protocols - that will allow Kafka applications to perform rebalancing incrementally and in cooperation. This will allow Kafka applications to scale even further under circumstances where the existing rebalancing mechanism is reaching its limits or incurs unnecessary costs. Before we discuss the general approach and each design in detail, we list definitions of common terms that are used in this proposal and have been used in Kafka proposals in the past.
Group: In this context the term group is used to describe a group of distributed processes that use Kafka to run in cooperation as a common group.
Resource: A type of resource to be shared and distributed among the members of a group. Common types of resources include Kafka topic-partitions that are assigned to Kafka applications, as well as connectors and tasks that are distributed among Kafka Connect workers. Resources can be application-defined and Kafka coordinator is agnostic of their definition.
Rebalance/Rebalancing: the procedure that is followed by a number of distributed processes that use Kafka clients and/or the Kafka coordinator to form a common group and distribute a set of resources among the members of the group.
Group Membership Protocol: A protocol that is used between Kafka applications and the Kafka coordinator to define groups of distributed processes within a specific Kafka application.
Request/Response Types used by the Group Membership Protocol: The procedure that defines membership within a group during rebalancing is implemented by using messages between processes of a group and the Kafka coordinator of the following types:
Embedded Protocol: A protocol that is used by Kafka applications and consists of the definition of types of resources and an algorithm that uses rebalancing as a process in order to distributed these resources among a group in a Kafka application.
Class Types used by the Embedded Protocol: So far, all embedded protocols have been using two classes to describe and distribute resources among the group:
Currently, rebalancing is based on two protocols. First, a core, lower-level protocol that ensures group membership. Kafka coordinator and any Kafka component that forms groups (clients, Connect or others) are aware of the core protocol that is based on the definition of the rebalance request/response types mentioned above. Then, on top of the core protocol, a second protocol is defined that is responsible to describe resources which need to be distributed among the members of a group. This type of protocol is called embedded protocol and its logic is piggybagged within the core protocol. Because of this layering, the Kafka coordinator remains agnostic of an embedded protocol and only the members of the group, including the group leader, are using it. Since the resource aware protocols can be embedded to the core protocol without the latter being changed, it makes sense for several embedded protocols to exist on top of a single core group membership protocol. Every embedded protocol defines and distributes a different set of resources during rebalancing. However, up to now, across all embedded protocols the resources are exchanged (and therefore balanced) in an all-or-nothing fashion: in every rebalancing round, each member is releasing its resources, stopping at the same time any processing related to these resources and requests to re-join the group.
This situation, known also as stop-the-world effect, has proven inflexible and expensive at large scale and under specific circumstances. Specifically, there are two dimensions in which a more selective management of resources and the ability to cope with temporary imbalances could allow Kafka applications to scale even better:
Based on these two general observations, here we will examine a set of specific use cases that can benefit from incremental cooperative rebalancing that is more flexible and can handle temporary imbalances without enforcing stop-the-world across the board. These use cases are:
The key idea behind all the designs proposed here is to change the assumption we've always made with groups and their resources until now: when you go into a JoinGroup, it is expected that you give up control of all resources that you control and get them into a clean state for handoff. Among various components that use rebalancing, here are a few examples of what resources mean:
We'd want to change the semantics of rebalancing in a way that allows processes to hold onto resources and allow things to be in an imbalanced state for a while. This leads to an enhanced embedded protocol as you need to:
AbstractCoordinator
either immediately or later.With respect to the implementation aspects of this enhancement, two key characteristics are: a) that ideally we'd want to apply changes to the embedded protocol only, keeping the Kafka brokers and coordinator still agnostic of the changes, and b) that the common components should be shared and usable across Kafka applications and that it will be easy for each application (e.g. Consumer, Streams, Connect) to implement and provide its own policies for rebalancing in a modular way.
AbstractCoordinator
implementationsAbstractCoordinator
to make use of this. Currently it only relies on the PartitionAssignor
.Here we'll explore a few concrete designs for the proposed protocol of incremental cooperative rebalancing. We will also evaluate which problems each solves and how various consumer group events are handled (e.g. the case where a new leader is elected).
There's one uncommon event that we already know we want to consider for each design:
Further, we want to characterize the behavior of each in terms of the following scenarios defined above in the Motivation section:
Below, the proposed designs will focus on the Kafka Consumer use case. For Connect, although the resources managed differ, the rebalance process is similar enough that what works for consumer groups would work for Connect as well. For Streams we propose to keep leveraging the consumer group implementation as we do today and apply any changes required to adapt to the new partition management model. Uses cases that use the rebalance mechanism just for leadership assignment are not affected by the proposed changes and will continue to work in the same way as today.
This is the simplest version that provides incremental cooperative rebalancing in multiple rounds. The format of the Subscription
and Assignment
classes respectively are defined as follows:
Subscription (Member → Leader):
Subscription => Version SubscribeTopics PartitionAssignorData AssignedTopicPartitions Version => Int16 SubscribeTopics => [String] PartitionAssignorData => Bytes AssignedTopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] |
Assignment (Leader → Member):
Assignment => Version AssignedTopicPartitions RevokeTopicPartitions Version => int16 AssignedTopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] RevokeTopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] |
Member process:
JoinGroup
. Also include your previous assignment (null or empty for new members)Leader process:
AssignedTopicPartitions
in the subscription) and include necessary partitions in RevokePartitions
. The returned AssignedTopicPartitions
should be the computed assignment minus anything included in RevokePartitions
since we need to wait for them to give up.AssignedTopicPartitions
. This case should be handled just by ensuring all topic partitions in the subscriptions are assigned.Events:
AssignedPartitions
while we wait for other members to do revocation. The new member will get its assignment after everyone else revokes and rejoins.Initial group and assignment: A(T1,T4), B(T2), C(T3) D(T) joins Leader computes new assignment as: A(T1), B(T2), C(T3), D(T4) but sends assignment with revocation request: A(assigned:,revoked:T4), B(assigned:,revoked:), C(assigned:,revoked:), D(assigned:,revoked:) Members join with subscriptions: A(T,assigned:T1), B(T,assigned:T2), C(T,assigned:T3), D(T,assigned:) Leader computes new assignment as: A(assigned:,revoked:), B(assigned:,revoked:), C(assigned:,revoked:), D(assigned:T4,revoked:) |
Member leaves - Rebalance will be triggered by leave group (or session timeout). When the leader computes new assignments, the partitions previously assigned to the former member will not be in any AssignedTopicPartitions
of the Subscription messages sent by the currently participating members. Therefore, these partitions are considered revoked already and they can be immediately assigned in this iteration. No further rebalances will be required.
Initial group and assignment: A(T1), B(T2), C(T3), D(T4) D(T4) leaves Rebalance is triggered. Remaining member rejoin with subscriptions: A(T,assigned:T1), B(T,assigned:T2), C(T,assigned:T3) Leader computes new assignment as: A(assigned:T1,T4,revoked:), B(assigned:T2,revoked:), C(assigned:T3,revoked:) |
Characteristics:
Discussion:
In general this policy results in more rebalances. However, it has the benefit that in all cases only the partitions being moved are affected (assuming join group moved to background thread, see note in decisions below). Further, some of the rebalances are expected to be fast if we get all members to know to rejoin quickly. So having more rebalances isn't really an issue unless it has substantial resource overhead.
One implicit change here is that ConsumerRebalanceListener
now may be invoked with partial sets, or we need to introduce another callback interface that handles that case. Possibly this change and a config to switch between protocol versions are the only public API changes that would be needed.
The main overhead here is the addition of AssignedTopicPartitions
, which could be substantial in cases like MirrorMaker. However, this is no worse than the assignment data being sent from the leader to members.
Also, note that we need to take care in computing assignments. The AssignedTopicPartitions
could be a subset of the total set of topic partitions; combining this with a new leader, it is important to generate assignments from the SubscribeTopics
, only using AssignedTopicPartitions
to a) keep things sticky and b) figure out what needs to be included in RevokeTopicPartitions
.
As with all of these proposals, the assumption is that members are cooperative, so there is no explicit indication that topic partitions were actually given up.
Finally, one potential concern is that not all members have an indicator that a rebalance is going to be required since they base this on the presence of any RevokeTopicPartitions
. One possible improvement would be to rely on an empty, non-null list (or an extra boolean indicator field) to notify them immediately. This could potentially speed up the second rebalance since they wouldn't have to wait to get the indicator via heartbeat.
This variant builds on the previous one and adds control over when we should schedule another rebalance instead of always trying to resolve imbalance immediately. Subscription
and Assignment
classes are defined as follows:
Subscription (Member → Leader):
Subscription => Version SubscribeTopics PartitionAssignorData AssignedTopicPartitions Version => Int16 SubscribeTopics => [String] PartitionAssignorData => Bytes AssignedTopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] |
Assignment (Leader → Member):
Assignment => Version AssignedTopicPartitions RevokeTopicPartitions ScheduledRebalanceTimeout Version => int16 AssignedTopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] RevokeTopicPartitions => [Topic Partitions] Topic => String Partitions => [int32] ScheduledRebalanceTimeout => int32 |
Note that the only difference in the format is ScheduledRebalanceTimeout
and we might also choose to make this a fixed constant set via configuration instead of dynamic in the protocol.
Member process:
ScheduledRebalanceTimeout
> 0, plan to rejoin as soon as possible after that timeout. (This should only be set if the RevokePartitions
is empty.)Leader process:
AssignedTopicPartitions
in the subscription) and include necessary partitions in RevokePartitions
. The returned AssignedTopicPartitions
should be the computed assignment minus anything included in RevokePartitions
since we need to wait for them to give up.AssignedTopicPartitions
from members indicate either:AssignedTopicPartitions
is empty due to the restart. For a stable leader, they can just compare the set of member IDs to the last generation. For a new leader we can use a heuristic such as looking for a member that has no previous assignments (assuming there's enough members that they should have had an assignment).ScheduledRebalanceTimeout
appropriately to defer the actual movement in the hopes the member will reappear. In this case there should be no RevokeTopicPartitions
.Events:
ScheduledRebalanceTimeout
to get the group to wait some time before resolving any imbalance.Initial group and assignment: A(T1), B(T2), C(T3), D(T4) D(T4) leaves Rebalance is triggered. Remaining member rejoin with subscriptions: A(T,assigned:T1), B(T,assigned:T2), C(T,assigned:T3) Leader computes detects "lost" partition T4. Sends empty assignments, without revocations and a scheduled rebalance timeout of t1: A(assigned:,revoked:,t1), B(assigned:,revoked:,t1), C(assigned:,revoked:,t1) After t1, remaining members join again: A(T,assigned:T1), B(T,assigned:T2), C(T,assigned:T3) Leader sends updated assignment: A(assigned:T1,T4,revoked:,-), B(assigned:,revoked:,-), C(assigned:,revoked:,-) |
Initial group and assignment: A(T1), B(T2), C(T3), D(T4) D(T4) bounces. First leaves the group. Rebalance is triggered. Remaining member rejoin with subscriptions: A(T,assigned:T1), B(T,assigned:T2), C(T,assigned:T3) Leader computes detects "lost" partition T4. Sends empty assignments, without revocations and a scheduled rebalance timeout of t1: A(assigned:,revoked:,t1), B(assigned:,revoked:,t1), C(assigned:,revoked:,t1) Before t1, member D joins again as D' Rebalance is triggered. All members join with subscriptions: A(T,assigned:T1), B(T,assigned:T2), C(T,assigned:T3), D'(T,assigned:) Leader sends updated assignment: A(assigned:,revoked:,-), B(assigned:,revoked:,-), C(assigned:,revoked:,-), D'(assigned:T4,revoked:,-) |
Characteristics:
ScheduledRebalanceTimeout
timeout, we just assign immediately and get back to work, so you are limited by how fast kubernetes detects the failure and brings back your process.ScheduledRebalanceTimeout
to resolve since we need to be confident the member has really left the group.Discussion:
As noted, in the cases where a process comes back, the outage is basically only limited to the time it takes the process to come back (plus the heartbeat interval since other members need to find out about the need to rejoin). This means users in environments like that could reasonably set the scheduled rebalance timeout relatively high as long as they don't expect to regularly have scale down events.
Having the flexibility to control delaying assignment in the protocol covers the need for delayed rebalancing when a new consumer group is formed, which was the subject of KIP-134 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance). Therefore, implementation of this deferred resolution of imbalance policy could allow us to provide a more holistic solution and deprecate the specific configuration (group.initial.rebalance.delay.ms), that is addressing consecutive rebalances only at the initial stage of a consumer group.
The loss of state when the leader changes to a member new to the group can result in a liveness situation under which the time before actually performing an assignment keeps getting extended indefinitely. This can only happen if the leader changes to a new member of the group, within the scheduled rebalance timeout indefinitely. Arguably, this is unlikely enough that it is safe to ignore (bouncing members so fast probably means you're doing something wrong and you shouldn't expect stable groups anyway).
This variant is just an extension of the behavior of the deferred resolution. The idea is simple so a full description is skipped here. The only change is that the leader could choose to only reassign some partitions and handle the rest in the same iterative fashion incrementally.
In this case you probably want the ScheduledRebalanceTimeout
to be explicit in the message because you might use one interval for detecting members leaving and another for how long to wait between moving partitions.
This would be less for the detection of nodes that go missing/reappear since in that case you really just want to assign everything that was "lost" to the new node, or evenly distribute. The case where this is useful would be something like Connect where you might scale down # of tasks for a connector such that you would be in an imbalanced state when just those tasks were deleted, or you're scaling up your cluster and add a new node (the key characteristic being that the cluster is imbalanced, but everything is still assigned and humming along). If you move everything at once, all affected tasks have to stop + commit before anyone can proceed. If you have an heavy connector with a lot of buffered data and a light connector with little, the light connector tasks get stuck doing nothing while waiting for the heavy connector tasks to finish. By moving each individually, we decouple them and minimize the outage for each task.
Characteristics:
Discussion:
This policy aims to be helpful in very large deployments of groups with diverse resources, such as large Connect clusters with a diverse set of connectors and tasks.
As discussed above, the proposed changes here focus on the embedded protocol. In this case, a round of rolling bounces will be sufficient to upgrade all the members of a group to the latest version of the embedded protocol. Another round might be useful if it's desired to disable the old version completely.
Here are some conclusions that derive from the above:
Out of the available options regarding any potential enhancement to the rebalancing mechanisms and the respective protocols, an incremental approach where we keep the changes primarily to the embedded protocol is a good first step. We can evaluate the need for other changes that might involve changes in the Kafka coordinator's protocol after implementing the proposed enhancements here. This will allow for the implementation of various policies in the different Kafka components to move at their own pace.
This document was composed based on original contributions by Ewen Cheslack-Postava , Jason Gustafson , guozhang Wang and Konstantine Karantasis