Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[Under Discussion]Accepted (2.4.0)

Discussion thread: TBD link

JIRA:  TBD  

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
maximumIssues20
jqlQuerykey = KAFKA-8019 or key = KAFKA-8179
serverId5aa69414-a9e9-3523-82ec-879b028fb15b

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415. There are already exciting discussions around it, but for Kafka Streams, the delayed rebalance is not the complete solution. This KIP is trying to customize the cooperative rebalancing approach specifically for KStream application context,  based on the great design for KConnect.

Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned hosts take over them. New hosts need to restore assigned tasks' state before transiting to "running". For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept a fair amount of restoring tasks, and finish state reconstruction first before officially taking over the active tasks. Ideally, we could realize no downtime transition during cluster scaling.

In short, the goals of this KIP are:

  • Reduce unnecessary downtime due to task restoration and global application revocation.
  • Better auto scaling experience for KStream applications.
  • Stretch goal: better workload balance across KStream instances.

Proposed Changes

Terminology

we shall define several new terms for easy walkthrough of the algorithm.

  • Worker (A.K.A stream worker):  thread level streaming processor, who actually takes the stream task.
  • Instance (A.K.A stream instance): the KStream instance serving as container of stream worker set. This could suggest a physical host or a k8s pod. We will interleave the definition of worker and instance for the most part of discussion concerning "working member" because the capacity is essentially controlled by the instance relative size, not the worker.
  • Learner task: a special standby task that gets assigned to one stream instance to restore a current active task and transits to active when the restoration is complete.

Learner Task Essential

Learner task shares the same semantics as standby task, which is utilized by the restore consumer to replicate active task state. When the restoration of learner task is complete, the stream instance will initiate a new JoinGroupRequest to call out another rebalance to do the task transfer. The goal of learner task is to delay the task migration when the destination host has not finished or even started replaying the active task.

Stop-The-World Effect

As mentioned in motivation section, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all workers would

  1. Join group with all current assigned tasks revoked

  2. Wait until group assignment finish to get assigned tasks

  3. Replay the assigned tasks state

  4. Once all replay jobs finish, worker transits to running mode

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

For Kafka Connect, we choose to keep all current assigned tasks running and trade off with one more rebalance. The behavior becomes:

  1. Join group with all current active tasks running

  2. After first rebalance, sync the revoked partitions and stop them

  3. Rejoin group immediately with only active tasks to trigger a second rebalance

Feel free to take a look at KIP-415 example to get a sense of how the algorithm works.

For KStream, we are going to take a trade-off between “revoking all” and “revoking none” solution: we shall only revoke tasks that are being learned since last round. So when we assign learner tasks to new member, we shall also mark active tasks as "being learned" on current owners. Every time when a rebalance begins, the task owners will revoke the being learned tasks and join group without affecting other ongoing tasks. Learned tasks could then immediately transfer ownership without attempting for a second round of rebalance. Compared with KIP-415, we are optimizing for fewer rebalances, but increasing the metadata size and sacrificing partial availability of the learner tasks. 

Next we are going to look at several typical scaling scenarios and edge scenarios to better understand the design of this algorithm.

Normal Scenarios

Scale Up Running Application

The newly joined workers will be assigned with learner tasks by the group leader and they will replay the corresponding changelogs on local first. By the end of first round of rebalance, there is no “real ownership transfer”. When new member finally finishes the replay task, it will re-attempt to join the group to indicate that it is “ready” to take on real active tasks. During second rebalance, the leader will eventually transfer the task ownership.

Code Block
languagetext
titleScale-up
Cluster has 3 stream workers S1(leader), S2, S3, and they each own some tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance 
New member S5 joins the group.
Member S1~S5 join with following metadata: (S4 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: []) // T1 revoked because it's "being learned"
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [])
S1 performs task assignments: 
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])

#Third Rebalance 
Member S4 finishes its replay and becomes ready, re-attempting to join the group.
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [T1], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])

#Fourth Rebalance 
Member S5 is ready, re-attempt to join the group. 
Member S1~S5 join with following status:(S5 is not ready yet)
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: []) // T3 revoked because it's "being learned"
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [], revoked: [], learning: [T3])
S1 performs task assignments:
	S1(assigned: [T2], revoked: [], learning: [])
	S2(assigned: [T4], revoked: [T3], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
	S5(assigned: [T3], revoked: [], learning: [])
Now the group reaches balance with 5 members each owning one task.

...

Scaling up from scratch means all workers are new members. There is no need to implement a learner stage because there is nothing to learn: we don’t even have a changelog topic to start with. We should be able to handle this case by identifying whether the given task is in the active task bucket for other members, if not we just transfer the ownership immediately.

After deprecating group.initial.rebalance.delay, we still expect the algorithm to work because every task assignment during rebalance will adhere to the rule "if given task is currently active, reassignment must happen only to workers who are declared ready to serve this task."

Code Block
languagetext
titleScale-up from ground
Group empty state: unassigned tasks [T1, T2, T3, T4, T5]

#First Rebalance 
New member S1 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) // T1~5 not previously owned

#Second Rebalance 
New member S2, S3 joins the group
S1 performs task assignments:
S1(assigned: [T1, T2, T3, T4, T5], revoked: [], learning: []) 
S2(assigned: [], revoked: [], learning: [T3, T4])
S3(assigned: [], revoked: [], learning: [T5])

#Third Rebalance 
S2 and S3 are ready immediately after the assignment.
Member S1~S3 join with following status:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [], revoked: [], learning: [T3, T4])
	S3(assigned: [], revoked: [], learning: [T5])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [T3, T4, T5], learning: []) 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])

Scale Down Running Application

As we have already discussed around the “learner” logic, when performing the scale down of stream group, it is also favorable to initiate learner tasks before actually shutting down the instances. Although standby tasks could help in this case, it requires user to pre-set num.standby.tasks which may not be available when administrator performs scaling down. The plan is to use command line tool to tell certain stream members that a shutdown is on the way to be executed. These informed members will send join group request to indicate that they are “leaving soon”. During rebalance assignment, leader will perform the learner assignment among members without intention of leaving. And the leaving member will shut down itself once received the instruction to revoke all its active tasks.

For ease of operation, a new tool for scaling down the stream app shall be built. It will have access to the application instances, and ideally could do two types of scaling down:

...

This KIP is trying to customize the incremental rebalancing approach for Kafka consumer client, which will be beneficial for heavy-stateful consumers such as Kafka Streams applications.

In short, the goals of this KIP are:

  • Reduce unnecessary downtime due to unnecessary partition migration: i.e. partitions being revoked and re-assigned.
  • Better rebalance behavior for falling out members.


Background

Consumer Rebalance Protocol: Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, stream thread transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

Below shows the process of the rebalance protocol with a new member joining the group.

Image Added

Example: Streams Assignor Rebalance Metadata

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its pluggable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

Code Block
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   UserData                => Bytes

------------------


StreamsPartitionAssignor:

UserData (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

Code Block
KafkaConsumer:

Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes

------------------

StreamsPartitionAssignor:

UserData (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


Example: Consumer Sticky Assignor 

We also have a StickyAssignor provided out of the box trying to mitigate the cost of unnecessary partition migrations. This assignor only relies on subscription metadata but not modifying assignment metadata, as follows:

Code Block
KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   UserData                => Bytes

------------------


StickyAssignor:

UserData (encoded in version 1) => AssignedPartitions
   AssignedPartitions      => List<String, List<Int32>>


The goal of this incremental protocol, is to fully leverage on the sticky assignors which will try to reassign partitions to its previous owners in best effort, such that we will revoke less partitions as possible since the revocation process is costly.


Proposed Changes: Incremental Consumer Rebalance Protocol

We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier.

Consumer Protocol

More specifically, we would first inject more metadata at the consumer-layer, as:

Code Block
KafkaConsumer:

Subscription => TopicList UserData AssignedPartitions
   TopicList               => List<String>
   UserData                => Bytes   
   AssignedPartitions      => List<String, List<Int32>>   // new field


Assignment = AssignedPartitions UserData
   AssignedPartitions      => List<String, List<Int32>>
   UserData                => Bytes

Note that it is compatible to inject additional fields after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.


In addition, we want to resolve a long-lasting issue that when consumer's being kicked out of the group, since it no longer owns the partitions the `commit` call would doom to fail. To distinguish this case with the normal case that consumers are likely still within the group but just try to re-join, we introduce a new API into the consumer rebalance listener:

Code Block
languagejava
public interface ConsumerRebalanceListener {


    void onPartitionsRevoked(Collection<TopicPartition> partitions);


    void onPartitionsAssigned(Collection<TopicPartition> partitions);


    // new API
    default void onPartitionsLost(Collection<TopicPartition> partitions) {
        onPartitionsRevoked(partitions);
    }
}


For users implementing this rebalance listener, they would not need to make code changes necessarily if they do not need to instantiate different logic; but they'd still need to recompile their implementation class. The semantics of these callbacks do differ in the new cooperative protocol however, so you should review your implementation to make sure there are no logical changes needed. For details, see ConsumerRebalanceListener and ConsumerPartitionAssignor Semantics below.

Note that adding new fields would increase the size of the request, especially in cases like Streams where user metadata has been heavily encoded wither assignor-specific metadata. We are working on 

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7149
 with compression / reformation to reduce the user metadata footprint, and with that we believe adding this new field would not be pushing the size exceeding the message size limit.


Consumer Coordinator Algorithm

Rebalance behavior of the consumer (captured in the consumer coordinator layer) would be changed as follows.

  1. For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
    1. If subscription has changed: revoke all partitions who are not of subscription interest by calling onPartitionsRevoked, send join-group request with whatever left in the owned partitions in Subscription.
    2. If topic metadata has changed: call onPartitionsLost on those owned-but-no-longer-exist partitions; and if the consumer is the leader, send join-group request.
    3. If received REBALANCE_IN_PROGRESS from heartbeat response / commit response: re-join group with all the currently owned partitions as assigned partitions.
    4. If received UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION from join-group / sync-group / commit / heartbeat response: reset generation / clear member-id correspondingly, call rebalance listener's onPartitionsLost for all the partition and then re-join group with empty assigned partition.
    5. If received MEMBER_ID_REQUIRED from join-group request: set the member id, and then re-send join-group (at this moment the owned partitions should be empty).
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Collect the partitions that are claimed as currently owned from the subscriptions; let's call it owned-partitions.
    2. Call the registered assignor of the selected protocol, passing in the cluster metadata and get the returned assignment; let's call the returned assignment assigned-partitions. Note the this set could be different from owned-partitions.
    3. Compare the owned-partitions with assigned-partitions and generate three exclusive sub-sets:
      1. Intersection(owned-partitions, assigned-partitions). These are partitions that are still owned by some members, and some of them may be now allocated for new members. Let's call it maybe-revoking-partitions.
      2. Minus(assigned-partitions, owned-partitions). These are partitions that are not previously owned by any one. This set is non-empty when its previous owner is on older version and hence revoked them already before joining, or a partition is revoked in previous rebalance by the new versioned member and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call it ready-to-migrate-partitions.
      3. Minus(owned-partitions, assigned-partitions). These are partitions that does not exist in assigned partitions, but are claimed to be owned by the members. It is non-empty if some topics are deleted, or if the leader's metadata is stale (and hence the generated assignment does not have those topics), or if the previous leader has created some topics in its assignor that are not in the cluster yet (consider the Streams case). Let's call it unknown-but-owned-partitions.
    4. For maybe-revoking-partitions, check if the owner has changed. If yes, exclude them from the assigned-partitions list to the new owner. The old owner will realize it does not own it any more, revoke it and then trigger another rebalance for these partitions to finally be reassigned
    5. For ready-to-migrate-partitions, it is safe to move them to the new member immediately since we know no one owns it before, and hence we can encode the owner from the newly-assigned-partitions directly.
    6. For unknown-but-owned-partitions, it is also safe to just give them back to whoever claimed to be their owners by encoding them directly as well. If this is due to topic metadata update, then a later rebalance will be triggered anyways.
  3. For every consumer: after received the sync-group response, do the following:
    1. Calculate the newly-added-partitions as Minus(assigned-partitions, owned-partitions) and the revoked-partitions as Minus(owned-partitions, assigned-partitions).
    2. Update the assigned-partitions list.
    3. If the set of revoked-partitions is non-empty, call the rebalance listener's onPartitionsRevoked and rejoin to trigger another rebalance.
    4. For those newly-added-partitions, call the rebalance listener's onPartitionsAssigned (even if empty).


Below is a more illustrative of the different set of partitions and their assignment logic:

Image Added


No changes required from the broker side, since this logic change is completely wrapped inside the consumer protocol / coordinator implementation itself, and to brokers it is just the same as previous version's rebalances.

Note that one minor difference compared with KIP-415 is that we do not introduce the scheduledDelay in the protocol, but instead the consumer will trigger rebalance immediately. This is because the consumer protocol would applies to all consumers (including streams) and hence should be kept simple, and also because KIP-345 is being developed in parallel which is aimed for tackling the scaling out / rolling bounce scenarios already.

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.


NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsPartitionAssignor would be updated accordingly in Part II.

Now the new process for the same new-member-joining example becomes the following under the new protocol (note that on the existing member we will now only call revoke on b-0 once, and no longer revoking both a-0 and b-0 and then later assigning a-0 again:

Image Added

Rebalance Callback Error Handling

Today, when a user customized rebalance listener callback throws an exception, as long as it is not WakeupException / InterruptedException it will be swallowed and logged with an error. This has been complained by our users that it was not an efficient way for notifying them when it happened (

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4600
).

In this KIP, we propose to slightly modify the semantics of the listener, such that the callback is used more like a "notification" to users about what gets assigned and what gets revoked, etc, and even exceptions gets thrown, it would not cause any of these assignment / revocation results to be changed. Also, if the ConsumerCoordinator found out that there's no need to notify since no "new" partitions have been assigned or revoked, the corresponding callback would not be triggered either.

More specifically, we will change this behavior as well:

  1. ConsumerCoordinator will check if the newly assigned / revoked / lost partitions set is empty or not; and if not, we will not trigger the corresponding listener.
  2. When the listener callback throws an exception, ConsumerCoordinator will log an error, and then re-throws this exception all the way up to KafkaConsumer.poll().
    1. Note that with the new rebalance protocol, onPartitionsAssigned / onPartitionsRevoked may be called sequentially. If one throws an exception, we would still proceed to complete the rest of the callbacks while remembering the "first-thrown exception", and then at the end throws the remembered exception to the user.

Upon capturing the error, users can do the following depend on their exception handling logic:

  • Shutdown the consumer gracefully if the exception is considered a fatal error.
  • Retry consumer.poll() if they believe the exception is re-triable, be aware that it will be treated as if no side-effects are taken at all from the exception-thrown callback.


To give a concrete example, suppose your current assigned partitions are {1,2}, and the newly assigned partitions are {2,3}. The consumer will call onPartitionsRevoked(1) and then onPartitionsAssigned(3). Suppose the former failed with an exception, ConsumerCoordinator would still proceed to complete the latter callback (and assume the latter callback succeeds). If users decide to retry, it is still considered as having successfully changed to {2, 3} – i.e. we consider all of the effects user indicated in the callback have taken place.


Consumer Metrics

As part of this KIP we will also add some metrics on the consumer side related to rebalancing. These include:

  1. listener callback latency
    1. partitions-revoked-latency-avg
    2. partitions-revoked-latency-max
    3. partitions-assigned-latency-avg
    4. partitions-assigned-latency-max
    5. partitions-lost-latency-avg
    6. partitions-lost-latency-max
  2. rebalance rate and latency (# rebalances per day, and latency including the callback time as well)
    1. rebalance-rate-per-hour
    2. rebalance-total
    3. rebalance-latency-avg
    4. rebalance-latency-max
    5. rebalance-latency-total
    6. failed-rebalance-rate-per-hour
    7. failed-rebalance-total
  3. last-rebalance-seconds-ago (dynamic gauge)


CooperativeStickyAssignor and custom COOPERATIVE Assignors

Since we've already encoded the assigned partitions at the consumer protocol layer, for consumer's sticky partitioner we are effectively duplicating this data at both consumer protocol and assignor's user data. Similarly we have a StreamsPartitionAssignor which is sticky as well but relying on its own user data to do it. We have added a new out-of-the-box assignor for users that leverages the Subscription's built-in ownedPartitions. Consumer groups plugging in the new "cooperative-sticky" assignor will follow the incremental cooperative rebalancing protocol. A specific upgrade path is required for users wishing to do a rolling upgrade to the new cooperative assignor, as described in the compatibility section below. 

Users may also wish to implement their own custom assignor, or are already doing so, and want to use the new cooperative protocol. Any assignor that returns COOPERATIVE among the list in #supportedProtocols indicates to the ConsumerCoordinator that it should use the cooperative protocol, and must follow specific assignment logic. First, the assignor should try and be as "sticky" as possible, meaning it should assign partitions back to their previous owner as much as possible. The assignor can leverage the new ownedPartitions field that the Subscription has been augmented with in order to determine the previous assignment. Note that "stickiness" is important for the cooperative protocol to be effective, as in the limit that the new assignment is totally different than the previous one then the cooperative protocol just reduces to the old eager protocol as each member will have to completely revoke all partitions and get a whole new assignment. In addition, any time a partition has to be revoked it will trigger a follow up rebalance, so the assignor should seek to minimize partition movement. Second, in order to ensure safe resource management and clear ownership, the assignor must make sure a partition is revoked by its previous owner before it can be assigned to a new one. Practically speaking, this means that the assignor should generate its "intended" assignment and then check against the previous assignment to see if any partitions are being revoked (that is, in the ownedPartitions but not in the new assignment for a given consumer). If that is the case, that partition should be removed from the new assignment for that round, and wait until it has been revoked so that it can be assigned to its final owner in the second rebalance. See the CooperativeStickyAssignor implementation for an example.

Note that the CooperativeStickyAssignor is for use by plain consumer clients – the existing StreamsPartitionAssignor has simply been modified to support cooperative so users should not try to plug in the CooperativeStickyAssignor (or any other). The upgrade path for Streams differs slightly from that of the clients CooperativeStickyAssignor as well. See the section on Streams below for details.

ConsumerRebalanceListener and ConsumerPartitionAssignor Semantics

If you do choose to plug in a cooperative assignor and have also implemented a custom ConsumerRebalanceListener, you should be aware of how the semantics and ordering of these callbacks has changed. In the eager protocol, the timeline of a rebalance is always exactly as follows:

  1. Listener#onPartitionsLost: if the member has missed a rebalance and fallen out of the group, this new callback will be invoked on the set of all owned partitions (unless empty). The member will then rejoin the group.
  2. Listener#onPartitionsRevoked: called on the full set of assigned partitions
  3. Assignor#subscriptionUserdata: called when sending the JoinGroup request
  4. Assignor#assign: called only for group leader
  5. Assignor#onAssignment: invoked after receiving the new assignment
  6. Listener#onPartitionsAssigned: called on the full set of assigned partitions (may have overlap with the partitions passed to #onPartitionsRevoked

In the cooperative protocol, the timeline is less exact as some methods may or may not be called, at different times, and on different sets of partitions. This will instead look something like the following:

  1. Listener#onPartitionsLost: if the member has missed a rebalance and fallen out of the group, this new callback will be invoked on the set of all owned partitions (unless empty). The member will then rejoin the group.
  2. Listener#onPartitionsRevoked: if the topic metadata has changed such that some owned partitions are no longer in our subscription or don't exist, this callback will be invoked on that subset. If there are no partitions to revoke for those reasons, this callback will not be invoked at this point (note that this will likely be the case in a typical rebalance due to membership changes, eg scaling in/out, member crashes/restarts, etc)
  3. Assignor#subscriptionUserdata: called when sending the JoinGroup request
  4. Assignor#assign: called only for group leader. Note that the #assign method will now have access to the ownedPartitions for each group member (minus any partitions lost/revoked in step 0. or 1.)
  5. Listener#onPartitionsRevoked: this will be called on the subset of previously owned partitions that are intended to be reassigned to another consumer. If this subset is empty, this will not be invoked at all. If this is invoked, it means that a followup rebalance will be triggered so that the revoked partitions can be given to their final intended owner.
  6. Assignor#onAssignment: invoked after receiving the new assignment (will always be after any #onPartitionsRevoked calls, and before #onPartitionsAssigned).
  7. Listener#onPartitionsAssigned: called on the subset of assigned partitions that were not previously owned before this rebalance. There should be no overlap with the revoked partitions (if any). This will always  be called, even if there are no new partitions being assigned to a given member.

The italics indicate a callback that may not be called at all during a rebalance. Take note in particular that it is possible for #onPartitionsRevoked to never be invoked at all during a rebalance, and should not be relied on to signal that a rebalance has started. The #onPartitionsAssigned callback will however always be called, and can therefore be used to signal to your app that a rebalance has just completed.


Compatibility and Upgrade Path

Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner. In fact, the most tricky thing for this KIP is actually how to support safe online upgrade path, such that even if users made mistakes and not strictly following the instructions, we can pause the consumer from proceeding and hence eventually users will realized it by seeing e.g. consumer lags and investigating logs, rather than letting them to fall into an undefined behavior or even worse, having some partitions to be owned by more than one member.

Note that since we are injecting additional fields at the end of the consumer protocol, the new protocol would still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.


The key idea to resolve this, is to let Assignor implementors themselves to indicate the ConsumerCoordinator whether they are compatible with the protocol, and then relying ton multi-assignor protocols for a safe upgrade path: users need to keep two assignors when switching the rebalance protocol, and after that they can use another rolling bounce to remove the old versioned protocol.


More specifically, we will introduce the new public API ConsumerPartitionAssignor class and its #Subscription / #Assignment (the old classes are actually in `internal` package mistakenly, so we use this KIP to deprecate that class with this new one, along with augmented methods) as follows:

Code Block
languagejava
ConsumerGroupMetadata {
    public String groupId();

    public int generationId();

    public String memberId();

    public Optional<String> groupInstanceId();
}

public interface ConsumerPartitionAssignor {

    /**
     * Return serialized data that will be included in the {@link Subscription} sent to the leader
     * and can be leveraged in {@link #assign(Cluster, GroupSubscription)} ((e.g. local host/rack information)
     *
     * @param topics Topics subscribed to through {@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(java.util.Collection)}
     *               and variants
     * @return nullable subscription user data
     */
    default ByteBuffer subscriptionUserData(Set<String> topics) {
        return null;
    }

    /**
     * Perform the group assignment given the member subscriptions and current cluster metadata.
     * @param metadata Current topic/broker metadata known by consumer
     * @param groupSubscription Subscriptions from all members including metadata provided through {@link #subscriptionUserData(Set)}
     * @return A map from the members to their respective assignments. This should have one entry
     *         for each member in the input subscription map.
     */
    GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);

    /**
     * Callback which is invoked when a group member receives its assignment from the leader.
     * @param assignment The local member's assignment as provided by the leader in {@link #assign(Cluster, GroupSubscription)}
     * @param metadata Additional metadata on the consumer (optional)
     */
    default void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
    }

    /**
     * Indicate which rebalance protocol this assignor works with;
     * By default it should always work with {@link RebalanceProtocol#EAGER}.
     */
    default List<RebalanceProtocol> supportedProtocols() {
        return Collections.singletonList(RebalanceProtocol.EAGER);
    }

    /**
     * Return the version of the assignor which indicates how the user metadata encodings
     * and the assignment algorithm gets evolved.
     */
    default short version() {
        return (short) 0;
    }

    /**
     * Unique name for this assignor (e.g. "range" or "roundrobin" or "sticky"). Note, this is not required
     * to be the same as the class name specified in {@link ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG}
     * @return non-null unique name
     */
    String name();

    final class Subscription {
        public List<String> topics();

        public ByteBuffer userData();

        public List<TopicPartition> ownedPartitions();

        public void setGroupInstanceId(Optional<String> groupInstanceId);

        public Optional<String> groupInstanceId();
    }

    final class Assignment {
        public List<TopicPartition> partitions();

        public ByteBuffer userData();
    }

    final class GroupSubscription {
        public GroupSubscription(Map<String, Subscription> subscriptions);

        public Map<String, Subscription> groupSubscription();
    }

    final class GroupAssignment {
        public GroupAssignment(Map<String, Assignment> assignments);

        public Map<String, Assignment> groupAssignment();
    }

    /**
     * The rebalance protocol defines partition assignment and revocation semantics. The purpose is to establish a
     * consistent set of rules that all consumers in a group follow in order to transfer ownership of a partition.
     * {@link ConsumerPartitionAssignor} implementors can claim supporting one or more rebalance protocols via the
     * {@link ConsumerPartitionAssignor#supportedProtocols()}, and it is their responsibility to respect the rules
     * of those protocols in their {@link ConsumerPartitionAssignor#assign(Cluster, GroupSubscription)} implementations.
     * Failures to follow the rules of the supported protocols would lead to runtime error or undefined behavior.
     *
     * The {@link RebalanceProtocol#EAGER} rebalance protocol requires a consumer to always revoke all its owned
     * partitions before participating in a rebalance event. It therefore allows a complete reshuffling of the assignment.
     *
     * {@link RebalanceProtocol#COOPERATIVE} rebalance protocol allows a consumer to retain its currently owned
     * partitions before participating in a rebalance event. The assignor should not reassign any owned partitions
     * immediately, but instead may indicate consumers the need for partition revocation so that the revoked
     * partitions can be reassigned to other consumers in the next rebalance event. This is designed for sticky assignment
     * logic which attempts to minimize partition reassignment with cooperative adjustments.
     */
    enum RebalanceProtocol {
        EAGER((byte) 0), COOPERATIVE((byte) 1);

        private final byte id;

        RebalanceProtocol(byte id) {
            this.id = id;
        }

        public byte id() {
            return id;
        }

        public static RebalanceProtocol forId(byte id) {
            switch (id) {
                case 0:
                    return EAGER;
                case 1:
                    return COOPERATIVE;
                default:
                    throw new IllegalArgumentException("Unknown rebalance protocol id: " + id);
            }
        }
    }
}


Note the semantical difference between the above added fields:

  1. The assignor version indicate that for the same assignor series, when its encoded metadata and algorithm changed. It is assumed the newer versioned assignor is compatible with older versions, i.e. it is able to deserialize the metadata and adjust its assignment logic to cope with other older versioned members. It will be used in the JoinGroup request so that broker-side coordinator can select the one with highest version to be the leader (details see below). As for the upcoming release, it is not necessary to be used but in the future it can be useful if brokers have also been upgraded to support the augmented JoinGroup request.
  2. The assignor preferred protocol indicate the rebalance protocol it would work with. Note that the same assignor cannot change this preferred protocol value across in higher versions. ConsumerCoordinate will get this information and with that value it will decide which rebalance logic (e.g. the old one, or the newly proposed process in this KIP) to be used.
  3. The subscription / assignment version will be aligned with the assignor version, and it will not be exposed via public APIs to users since they are only used for Consumer Coordinator internally. Upon deserialization / serialization, the version of the subscription / assignment will be de / encoded first and the follow-up serde logic can then be selected correspondingly.


With the existing built-in Assignor implementations, they will be updated accordingly:


Highest VersionSupported StrategyNotes
RangeAssignor0EagerCurrent default value.
RoundRobinAssignor0Eager
StickyAssignor0Eager
CooperativeStickyAssignor0Eager, CooperativeTo be default value in 3.0
StreamsAssignor4Eager, Cooperative

The reason we make "range" and "round-robin" to not support cooperative rebalance is that, this protocol implicitly relies on the assignor to be somewhat sticky to make benefits by trading an extra rebalance. However, for these two assignors, they would not be sticky (although sometimes range may luckily reassign partitions back to old owners, it is not best-effort) and hence we've decided to not make them be selected for cooperative protocol. The existing StickyAssignor was not made to support Cooperative to ensure users follow the smooth upgrade path outlined below, and avoid running into trouble if they already use the StickyAssignor and blindly upgrade.


The ConsumerCoordinator layer, on the other hand, will select which protocol to use based on the first assignor specified in its configs, as the following:

  • Only consider protocols that are supported by all the assignors. If there is no common protocols supported across all the assignors, throw an exception during starting up time.
  • If there are multiple protocols that are commonly supported, select the one with the highest id (i.e. the id number indicates how advanced is the protocol, and we would always want to select the most advanced one).

The specific upgrade path is described below. Note that this will be different depending on whether you have a plain consumer app or a Streams app, so make sure to follow the appropriate one.

Consumer

From the user's perspective, the upgrade path of leveraging new protocols is similar to switching to a new assignor. For example, assuming the current version of Kafka consumer is 2.2 and "range" assignor is specified in the config (or no assignor is configured, which is identical as the RangeAssignor is the default below 3.0). The upgrade path would be:

  1. The first rolling bounce is to replace the byte code (i.e. swap the jars) and introduce the cooperative assignor: set the assignors to "cooperative-sticky, range" (or round-robin/sticky/etc if you are using a different assignor already). At this stage, the new versioned byte code sends both assignors in their join-group request, but will still choose EAGER as the protocol since it's still configured with the "range" assignor, and the selected rebalancing protocol must be supported by all assignors. in the list. The "range" assignor will be selected to assign partitions while everyone is following the EAGER protocol. This rolling bounce is safe.
  2. The second rolling bounce is to remove the "range" (or round-robin/sticky/etc) assignor, i.e. only leave the "cooperative-sticky" assignor in the config. At this stage, whoever has been bounced will then choose the COOPERATIVE protocol and not revoke partitions while others not-yet-bounced will still go with EAGER and revoke everything. However the "cooperative-sticky" assignor will be chosen since at least one member who's already bounced will not have "range" any more. The "cooperative-sticky" assignor works even when there are some members in EAGER and some members in COOPERATIVE: it is fine as long as the leader can recognize them and make assignment choice accordingly, and for EAGER members, they've revoked everything and hence did not have any pre-assigned-partitions anymore in the subscription information, hence it is safe just to move those partitions to other members immediately based on the assignor's output.

The key point behind this two rolling bounce is that, we want to avoid the situation where leader is on old byte-code and only recognize "eager", but due to compatibility would still be able to deserialize the new protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of built-in assignors since it is user-customizable and hence would be black box to the consumer coordinator, we'd need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce.

Streams

Streams embeds its own assignor which will determine the supported protocol. In 2.4 it will enable cooperative rebalancing by default, so a specific upgrade path must be followed in order to safely upgrade to 2.4+ since assignors on the earlier versions do not know how to handle a cooperative rebalance safely (or even what it is). To do so you must perform two rolling bounces as follows:

  1. The first rolling bounce is to replace the byte code (i.e. swap the jars): set the UPGRADE_FROM config to 2.3 (or whatever version you are upgrading from) and then bounce each instance to upgrade it to 2.4. The UPGRADE_FROM config will turn off cooperative rebalancing in the cluster until everyone is on the new byte code, and we can be sure that the leader will be able to safely complete a rebalance. 
  2. The second rolling bounce is to remove the UPGRADE_FROM config: simply remove this and bounce each instance for it to begin using the cooperative protocol. Note that unlike plain consumer apps, this means you will have some members on COOPERATIVE while others may still be on EAGER – as long as everyone is on version 2.4 or later, this is safe as the Streams assignor knows how to handle the assignment with either protocol in use. 

Note that as long as the UPGRADE_FROM parameter is set to 2.3 or lower, Streams will stay on EAGER. If for some reason you decide you would like to stay on eager, or return to it after switching to cooperative, you can simply set/leave the UPGRADE_FROM parameter in place. If you intend to use this config to stay on eager even after upgrading, it is recommended that you set it to a version in the range 2.0 - 2.3 as setting it to earlier than 2.0 will force Streams to remain stuck on an earlier metadata version.

These changes should all be fairly transparent to Streams apps, as there are no semantics only improved rebalancing performance. However, users using Interactive Queries (IQ) or implementing a StateListener will notice that Streams spends less time in the REBALANCING state, as we will not transition to that until the end of the rebalance. This means all owned stores will remain open for IQ while the rebalance is in progress, and Streams will continue to restore active tasks if there are any that are not yet running, and will process standbys if there aren't.

Allow Consumer to Return Records in Rebalance

As summarized in

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8421
, a further optimization would be to allow consumers to still return messages belong to its owned partitions even when it is within a rebalance.

In order to do this, we'd need to allow the consumer#commit API to throw RebalanceInProgressException if it is committing offset while a rebalance is undergoing.

Code Block
languagejava
    /**
     * ...
     *
     * @throws org.apache.kafka.common.errors.RebalanceInProgressException if the consumer instance is in the middle of a rebalance
     *            so it is not yet determined which partitions would be assigned to the consumer. In such cases you can first
     *            complete the rebalance by calling {@link #poll(Duration)} and commit can be reconsidered afterwards.
     *            NOTE when you reconsider committing after the rebalance, the assigned partitions may have changed,
     *            and also for those partitions that are still assigned their fetch positions may have changed too
     *            if more records are returned from the {@link #poll(Duration)} call.
     * ...
     */
    @Override
    public void commitSync() {
        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
    }

With this optimization (implemented in 2.5.0) consumer groups can continue to process some records even while a rebalance is in progress. This means that in addition to processing standby and restoring tasks during a rebalance, Streams apps will be able to make progress on running active tasks.

Looking into the Future: Heartbeat Communicated Protocol

Note that the above upgrade path relies on the fact that COOPERATIVE and EAGER members can work together within the same generation as long as the leader recognize both; this however may not be true moving forward if we add a third rebalance protocol. One idea to resolve this in the future is that, instead of letting the members to decide which protocol to use "locally" before sending the join-group request, we will use Heartbeat request / response to piggy-back the communication of the group's supported protocols and let members to rely on that "global" information to make decisions. More specifically:

  • On Heartbeat Request, we will add additional field as a list of protocols that this member supports.
  • On Heartbeat Response, we will add additional field as a single protocol indicating which to use if the error code suggests re-joining the group.

The broker, upon receiving the heartbeat request, if the indicated supported protocols does not contain the one it has decided to use for the up-coming rebalance, then reply with an fatal error.


Looking into the Future: Assignor Version

One semi-orthogonal issue with the current assignor implementation is that, the same assignor may evolve its encoded user metadata byte format; and when that happens, if the leader does not recognize the new format it will cause it to crash. For example today StreamsAssignor has evolved to version 4 of its user metadata bytes, and to cope with old versioned leader we have introduced the "version probing" feature which will require additional rebalances.

To resolve this issue, I'd propose to modify the JoinGroup protocol in this KIP as well to take the read `protocol version` from the PartitionAssignor.

Code Block
JoinGroupRequest (v5) => groupId SessionTimeout RebalanceTimeout memberId ProtocolType Protocols ProtocolVersion
   GroupId                 => String
   SessionTimeout          => Int32
   RebalanceTimeout        => Int32
   MemberId                => String
   ProtocolType            => String
   Protocols               => List<Protocol>

Protocol (v0) => ProtocolName ProtocolMetadata
   ProtocolName            => String
   ProtocolMetadata        => Bytes
   ProtocolVersion         => Int32                   // new field


And then on the broker side, when choosing the leader it will pick the one with the highest protocol version instead of picking it "first come first serve".

Although this change will not benefit the upgrade path at this time, in the future if we need to upgrade the assignor again, as long as they would not change the rebalance semantics (e.g. like we did in this KIP, from "eager" to "cooperative") we can actually use one rolling bounce instead since as long as there's one member on the newer version, that consumer will be picked.

For example, this can also help saving "version probing" cost on Streams as well: suppose we augment the join group schema with `protocol version` in Kafka version 2.3, and then with both brokers and clients being in version 2.3+, on the first rolling bounce where subscription and assignment schema and / or user metadata has changed, this protocol version will be bumped. On the broker side, when receiving all member's join-group request, it will choose the one that has the highest protocol version (also it assumes higher versioned protocol is always backward compatible, i.e. the coordinator can recognize lower versioned protocol as well) and select it as the leader. Then the leader can decide, based on its received and deserialized subscription information, how to assign partitions and how to encode the assignment accordingly so that everyone can understand it. With this, in Streams for example, no version probing would be needed since we are guaranteed the leader knows everyone's version -- again it is assuming that higher versioned protocol is always backward compatible -- and hence can successfully do the assignment at that round.


Edge Cases Discussion

This proposal depends on user's correct behavior that when upgrading to the new version everyone is still using "eager" protocol. But if user mades mistakes, it is still not going to fall into undefined behavior, as the assignor mechanism will guarantee that we must form a consensus on the protocol names, and whoever does not support the chosen one will be kicked out of the group and hence users would be notified about the mis-configure.

There's a few edge cases to illustrate this:

Non-active partition assignor

In some assignors like StreamsPartitionAssignor, there are secondary type of resources (e.g. standby-tasks) being assigned in addition to the active partition assignment which may not obey the cooperative manner since the assignor may not know the "currently owned" list of such resources from the subscription, and therefore it could immediately reassign such resources from one owner to another even if they are not revoked at all from the current owner yet.

In this case, as long as the secondary resources does not require any synchronization between its owners like the active partition (e.g. reading previously committed offsets) it should be fine. Take StreamsPartitionAssignor for example, its standby-task are owned separately and independently between instances and are only depend on local state directories (including the data as well as the checkpoint file corresponding to the offset). When a standby task is assigned to a host:

  • If the host does not contain any state directory for this task, it would bootstrap the standby-task from scratch, this is okay;
  • If the host does contain a state directory for this task, AND it also has this standby-task in previous generation (and it is not revoked at all), in this case we should just proceed normally and it is okay;
  • If the host does contain a state directory for this task, AND it does not have this standby-task in previous generation, then we should find its checkpoint offset from the current directory gracefully committed when this task migrates out of this instance more than one generation ago. It would resume from that checkpointed offset and is okay.

So more generally, an assignor supporting COOPERATIVE protocol and is assigning other types of resources than the partition as well would need to make sure that these resource assignment is also respecting the COOPERATIVE rules, or their assignment does not require any synchronization between instances as well.

Downgrading and Old-Versioned New Member

If a consumer is downgraded incorrectly after the above upgrade path is complete: i.e. it just replaced with the old jar without changing any configs, it is treated as first leaving the group, and then rejoining the group as an new member with "eager". This situation can also be reflected when a new member with "eager" is joining a group (probably mistakenly) whereas everyone else have been switched to "cooperative". 

At the moment, no consensus protocol can be chosen with this member joining, and hence this member or everyone else will be kicked out of the group with a fatal error.

The right way to downgrade is to perform a rolling bounce to first add back the RangeAssignor (or whichever assignor you wish to use), and then perform a second rolling bounce in which you remove the CooperativeStickyAssignor and also downgrade the consumers to the old byte code. It's essentially the same as the upgrade path, but in reverse.


Public Interface

This is to quickly summarize what we would change on the public interface exposed to the user. 

  1. Introducing the new {ConsumerPartitionAssignor} interface and deprecate the old {PartitionAssignor} interface (details see ConsumerProtocol).
  2. Augmenting the existing {ConsumerRebalanceListener} interface with the new {onPartitionsLost} function (details see ConsumerProtocol).
  3. Adding a list of new metrics to the consumer instance reflecting rebalance events (details see ConsumerMetrics).
  4. Augmenting the JoinGroupRequest protocol with the protocol version (details see LookingintotheFuture:AssignorVersion).


Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is described above.

Rejected Alternatives

Another solution that we have discussed about, is to make each assignor only supports one protocol, i.e.:

Code Block
languagejava
interface PartitionAssignor {

    // existing interfaces

    short version();             // new API, the version of the assignor which indicate the user metadata / algorithmic difference.

    String name();

    PartitionAssignorProtocol supportedProtocol();    // new API, indicate the ConsumerCoordinator the rebalance strategy it would work with.
}


The existing built-in Assignor implementations will then be updated to:


Highest VersionSupported StrategyNotes
RangeAssignor0EagerCurrent default.
RoundRobinAssignor0Eager
StickAssignor (old)0Eager
StickAssignor (new)0CooperativeWill be new default in 3.0
StreamsAssignor (old)4Eager
StreamsAssignor (new)4Cooperative


Although it makes the upgrade path simpler since we would no longer need the "rebalance.protocol" config on consumer anymore, while just encoding multiple assignors during the first rolling bounce of the upgrade path, it requires duplicated assignor class (of course, the new class could just be extending from the old one and there's not much LOC duplicated) which is a bit cumbersome

...

Code Block
languagetext
titleLeader crash during scaling
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]
Scaling down the application, S2 will be leaving.

#First Rebalance 
Member S2 joins the group and claim that it is leaving
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [T4])

#Second Rebalance 
S3 finishes replay first and trigger another rebalance
Member S1~S3 join with following status:(S1 is not ready yet)
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [T5], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [T3], revoked: [T4], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])

#Third Rebalance 
S1 finishes replay and trigger rebalance.
Member S1~S3 join with following status: 
	S1(assigned: [T1, T2], revoked: [], learning: [T3])
	S2(assigned: [], revoked: [T3], learning: []) 
	S3(assigned: [T4, T5], revoked: [], learning: [])
S1 performs task assignments:
	S1(assigned: [T1, T2, T3], revoked: [], learning: [])
	S2(assigned: [], revoked: [T3], learning: [])
	S3(assigned: [T4, T5], revoked: [], learning: [])
S2 will shutdown itself upon new assignment since there is no assigned task left.

Online Host Swapping

This is a typical use case where user wants to replace entire application's host type. Normally administrator will choose to do host swap one by one, which could cause endless KStream resource shuffling. The recommended approach under cooperative rebalancing is like:

  • Increase the capacity of the current stream job to 2 and boost up new type instances
  • Mark existing stream instances as leaving
  • Learner tasks finished on new hosts, shutting down old ones.
Code Block
languagetext
titleOnline Swapping
Group stable state: S1[T1, T2], S2[T3, T4]
Swapping application instances, adding S3, S4 with new instance type.

#First Rebalance 
Member S3, S4 join the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: [T2])
	S4(assigned: [], revoked: [], learning: [T4])

Use scaling tool to indicate S1 & S2 are leaving 
#Second Rebalance 
Member S1, S2 initiate rebalance to indicate state change (leaving)
Member S1~S4 join with following status: 
	S1(assigned: [T1], revoked: [T2], learning: [])
	S2(assigned: [T3], revoked: [T4], learning: []) 
	S3(assigned: [], revoked: [], learning: [T2])
	S4(assigned: [], revoked: [], learning: [T4])
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: [T1, T2])
	S4(assigned: [], revoked: [], learning: [T3, T4])

#Third Rebalance 
S3 and S4 finishes replay T1 ~ T4 trigger rebalance.
Member S1~S4 join with following status: 
	S1(assigned: [], revoked: [T1, T2], learning: [])
	S2(assigned: [], revoked: [T3, T4], learning: [])
	S3(assigned: [], revoked: [], learning: [T1, T2])
	S4(assigned: [], revoked: [], learning: [T3, T4])
S1 performs task assignments:
	S1(assigned: [], revoked: [], learning: [])
	S2(assigned: [], revoked: [], learning: [])
	S3(assigned: [T1, T2], revoked: [], learning: [])
	S4(assigned: [T3, T4], revoked: [], learning: [])
S1~S2 will shutdown themselves upon new assignment since there is no assigned task left.

Edge Scenarios

Leader Transfer During Scaling 

Leader crash could cause a missing of historical assignment information. For the learners already assigned, however, each worker maintains its own assignment status, so when the learner task's id has no corresponding active task running, the transfer will happen immediately. Leader switch in this case is not a big concern. 

Code Block
languagetext
titleScale-down stream applications
Cluster has 3 stream workers S1(leader), S2, S3, and they own tasks T1 ~ T5
Group stable state: S1[T1, T2], S2[T3, T4], S3[T5]

#First Rebalance 
New member S4 joins the group
S1 performs task assignments:
	S1(assigned: [T1, T2], revoked: [], learning: [])
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [T1])

#Second Rebalance
S1 crashes/gets killed before S4 is ready, S2 takes over the leader.
Member S2~S4 join with following status: 
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [T1])
Note that T2 is unassigned, and S4 is learning T1 which has no current active task. We 
could rebalance T1, T2 immediately.	
S2 performs task assignments:
	S2(assigned: [T3, T4], revoked: [], learning: [])
	S3(assigned: [T5, T2], revoked: [], learning: [])
	S4(assigned: [T1], revoked: [], learning: [])
Now the group reaches balance.

Leader Transfer Before Scaling 

However, if the leader dies before new instances join, the potential risk is that leader could not differentiate which stream instance is "new", because it relies on the historical information. For version 1.0, final assignment is probably not ideal in this case if we only attempt to assign learner task to new comers. This also motivates us to figure out a better task coordination strategy for load balance in long term.

Code Block
languagetext
titleLeader crash before
Cluster has 3 stream workers S1(leader), S2 and they own tasks T1 ~ T5
Group stable state: S1[T1], S2[T2, T3, T4, T5]

#First Rebalance 
New member S4 joins the group, at the same time S1 crash.
S2 takes over the leader
S2 ~ S4 join with following status
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [], revoked: [], learning: []) 
	S4(assigned: [], revoked: [], learning: [])
S2 performs task assignments:
	S2(assigned: [T2, T3, T4, T5], revoked: [], learning: [])
	S3(assigned: [T1], revoked: [], learning: [])
	S4(assigned: [], revoked: [], learning: [])

Now the group reaches balance, although the eventual load is skewed.

Optimizations

Stateful vs Stateless Tasks

For stateless tasks the ownership transfer should happen immediately without the need of a learning stage, because there is nothing to restore. We should fallback the algorithm towards KIP-415 where the stateless tasks will only be revoked during second rebalance. This feature requires us to add a new tag towards a stream task, so that when we eventually consider the load balance of the stream applications, this could help us separate out tasks into two buckets and rebalance independently.

Eager Rebalance 

Sometimes the restoration time of learner tasks are not equivalent. When assigned with 1+ tasks to replay, the stream worker could require immediate rebalance as a subset of learning tasks are finished in order to speed up the load balance and resource waste of double task processing, with the sacrifice of global efficiency by introducing many more rebalances. We could supply user with a config to decide whether they want to take eager approach or stable approach eventually, with some follow-up benchmark tools of the rebalance efficiency. Example:

A stream worker S1 takes two learner tasks T1, T2, where restoring time time(T1) < time(T2). Under eager rebalance approach, the worker will call out rebalance immediately when T1 finishes replaying. While under conservative approach, worker will rejoin the group until it finishes replaying of both T1 and T2.

Standby Task Utilization

Don’t forget the original purpose of standby task is to mitigate the issue during scaling down. When performing learner assignment, we shall prioritize workers which currently have standby tasks that match learner assignment. Therefore the group should rebalance pretty soon and let the leaving member shutdown themselves fairly quickly. 

Scale Down Timeout

Sometimes end user wants to reach a sweet spot between ongoing task transfer and streaming resource free-up. So we want to take a similar approach as KIP-415, where we shall introduce a client config to make sure the scale down is time-bounded. If the time takes to migrate tasks outperforms this config, the leaving member will shut down itself immediately instead of waiting for the final confirmation. And we could simply transfer learner tasks to active because they are now the best shot to own new tasks.

Task Tagging

Note that to make sure the above resource shuffling could happen as expected, we need to have the following task status indicators to be provided:

...

Assignment Algorithm

The above examples are focusing more on demonstrating expected behaviors with KStream incremental rebalancing "end picture". However, we also want to have a holistic view of the new learner assignment algorithm during each actual rebalance.

We shall assign tasks in the order of: active, learner and standby. The assignment will be broken down into following steps:

Code Block
languagesql
Algorithm incremental-rebalancing

Input Set of Tasks,
	  Set of Instances,
      Set of Workers,

      Where each worker contains:
		Set of active Tasks,
		Set of standby Tasks,
		owned by which instance

Main Function
	
	Assign active tasks: (if any)
		To instances with learner tasks that indicate "ready"
		To previous owners
		To unready learner tasks owners
  	 	To instances with standby tasks
		To resource available instances

	Keep existing learner tasks' assignment unchanged

 	Pick new learner tasks out of heaviest loaded instances
 
	Assign learner tasks: (if any)
		To new-coming instances with abundant resource
		To instances with corresponding standby tasks

	Assign standby tasks: (if any)
		To instances without matching active tasks
		To previous active task owners after learner transfer in this round
		To resource available instances
		Based on num.standby.task config, this could take multiple rounds

Output Finalized Task Assignment

Backing up Information on Leader 

Since the incremental rebalancing requires certain historical information of last round assignment, the leader worker will need to maintain the knowledge of:

  1. Who participated in the last round of rebalance. This is required information to track new comers.
  2. Who will be leaving the consumer group. This is for scaling down support as the replay could take longer time than the scaling down timeout. Under static membership, since we don't send leave group information, we could leverage leader to explicitly trigger rebalance when the scale-down timeout reaches. Maintaining set of leaving members are critical in making the right task shuffle judgement.

These are essential group state knowledges leader wants to memorize. To avoid the severity of leader crash during scaling, we are avoiding backing up too much information on leader for now. 

...

Delivery goal: Scale up support, conservative rebalance

The goal of first version is to realize the foundation of learner algorithm for scaling up scenario. The leader worker will use previous round assignment to figure out which instances are new ones, and the learner tasks shall only be assigned to them onceThe reason we are hoping to only implement new instances is because there is a potential edge case that could break the existing naive learner assignment: when the number of tasks are much smaller than total cluster capacity, we could fall in endless resource shuffling. Also we care more about smooth transition over resource balance for stage one. We do have some historical discussion on marking weight for different types of tasks. If we go ahead to aim for task balance too early, we are potentially in the position of over-optimization. In conclusion, we want to delay the finalized design for eventual balance until last version.

We also don't want to take the eager rebalance optimization in version 1.0 due to the explained concerns.

Version 2.0

Delivery goal: Scale down support

We will focus on the delivery of scaling down support upon the success of version 1.0. We need to extend on the v1 protocol since we need existing instances to take the extra learning load. We shall break the statement in v1 which claims that "only new instances could take learner tasks". To make this happen, we need to deliver in following steps:

  1. Create new tooling for marking instances as ready to scale down
  2. Tag the leaving information for targeted members
  3. Scale down timeout implementation

Version 3.0

Delivery goal: Eager rebalance analysis

A detailed analysis and benchmark support need to be built before fully devoting effort to this feature. Intuitively most applications should be able to tolerate minor discrepancy of task replaying time, while the cost of extra rebalances and increased debugging complexity are definitely things we are not in favor of. 

The version 3.0 is upon version 1.0 success, and could be done concurrently with version 2.0. We may choose to adopt or discard this change, depending on the benchmark result.

Version 4.0 (Stretch)

Delivery goal: Task labeling, eventual workload balance

The 4.0 and the final version will take application eventual load balance into consideration. If we define a balancing factor x, the total number of tasks each instance owns should be within the range of +-x% of the expected number of tasks, which buffers some capacity in order to avoid imbalance. A stream.imbalance.percentage will be provided for the user to configure. The smaller this number sets to, the more strict the assignment protocol will behave.

We also want to balance the load separately for stateful tasks and stateless tasks as discussed above. So far version 4.0 still has many unknowns and is slightly beyond the incremental rebalancing scope. A separate KIP could be initiated.

Trade-offs

More Rebalances

The new algorithm will invoke many more rebalances than the current protocol as one could perceive. As we have discussed in the overall incremental rebalancing design, it is not always bad to have multiple rebalances when we do it wisely, and after KIP-345 we have a future proposal to avoid scale up rebalances for static members. The goal is to pre-register the members that are planning to be added. The broker coordinator will augment the member list and wait for all the new members to join the group before rebalancing, since by default stream application’s rebalance timeout is infinity. The conclusion is that: it is server’s responsibility to avoid excessive rebalance, and client’s responsibility to make each rebalance more efficient.

Metadata Space vs Rebalance Efficiency

Since we are carrying over more information during rebalance, we should be alerted on the metadata size increase. So far the hard limit is 1MB per metadata response, which means if we add on too much information, the new protocol could hit hard failure. This is a common pain point for finding better encoding scheme for metadata if we are promoting incremental rebalancing KIPs like 415 and 429. Some thoughts from Guozhang have started in this JIRA and we will be planning to have a separate KIP discussing different encoding technologies and see which one could work.

Public Interfaces

We are going to add a new type of protocol called "stream" for the protocol type. 

Code Block
languagejava
titleProtocol Type
ProtocolTypes : {"consumer", "connect", "stream"}

Also a bunch of new configs for user to better apply this change.

...

Default: incremental

Version 1.0

...

The setting to help ensure no downtime upgrade of online application.

Options : upgrading, incremental

...

scale.down.timeout.ms

Default: infinity

Version 2.0

...

Time in milliseconds to force terminate the stream worker when informed to be scaled down.

...

learner.partial.rebalance

Default : true

Version 3.0

...

If this config is set to true, new member will proactively trigger rebalance when it finishes restoring one learner task state each time, until it eventually finishes all the replaying. Otherwise, new worker will batch the ready call to ask for single round of rebalance.

...

Default: 0.2

Version 4.0

...

The tolerance of task imbalance factor between hosts to trigger rebalance.

Implementation Plan

Call out this portion because the algorithm we are gonna design is fairly complicated so far. To make sure the delivery is smooth with fundamental changes of KStream internals, I build a separate Google Doc here that could be sharable to outline the step of changes. Feel free to give your feedback on this plan while reviewing the algorithm, because some of the changes are highly coupled with internal changes. Without these details, the algorithm is not making sense.

Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer changes the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Switching Protocol Type

As we have mentioned above, a new protocol type shall be created. To ensure smooth transition, we need to make sure the existing job doesn't fail. The workflow for upgrading is like below:

  • Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far this workaround could make the effort smaller.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the google doc.