Table of Contents |
---|
Status
Current state: [Under Discussion]
...
JIRA:
Jira | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415. There are already exciting discussions around it, but for Kafka Streams, the delayed rebalance is not the complete solution.
This KIP is trying to customize the cooperative incremental rebalancing approach specifically for KStream application context, based on the great design for KConnect.
Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned hosts take over them. New hosts need to restore assigned tasks' state before transiting to "running". For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept a fair amount of restoring tasks, and finish state reconstruction first before officially taking over the active tasks. Ideally, we could realize no downtime transition during cluster scaling.
In short, the goals of this KIP are:
- Reduce unnecessary downtime due to task restoration and global application revocation.
- Better auto scaling experience for KStream applications.
- Stretch goal: better workload balance across KStream instances.
Background
Consumer Rebalance Protocol: Stop-The-World Effect
As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would
Join group with all currently assigned tasks revoked.
Wait until group assignment finish to get assigned tasks and resume working.
Replay the assigned tasks state.
Once all replay jobs finish, stream thread transits to running mode.
The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.
Streams Rebalance Metadata: Remember the PrevTasks
Today Streams embed a full fledged Consumer
client, which hard-code a ConsumerCoordinator
inside. Streams then injects a StreamsPartitionAssignor
to its plugable PartitionAssignor
interface and inside the StreamsPartitionAssignor
we also have a TaskAssignor
interface whose default implementation is StickyPartitionAssignor
. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:
Code Block |
---|
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor. |
StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:
for Kafka consumer client, which will be beneficial for heavy-stateful consumers such as Kafka Streams applications.
In short, the goals of this KIP are:
- Reduce unnecessary downtime due to unnecessary partition migration: i.e. partitions being revoked and re-assigned.
- Better rebalance behavior for falling out members.
Background
Consumer Rebalance Protocol: Stop-The-World Effect
As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would
Join group with all currently assigned tasks revoked.
Wait until group assignment finish to get assigned tasks and resume working.
Replay the assigned tasks state.
Once all replay jobs finish, stream thread transits to running mode.
The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.
Example: Streams Assignor Rebalance Metadata
Today Streams embed a full fledged Consumer
client, which hard-code a ConsumerCoordinator
inside. Streams then injects a StreamsPartitionAssignor
to its pluggable PartitionAssignor
interface and inside the StreamsPartitionAssignor
we also have a TaskAssignor
interface whose default implementation is StickyPartitionAssignor
. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:
Code Block |
---|
KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor. |
StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:
Code Block |
---|
KafkaConsumer:
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
UserData => Bytes
------------------
StreamsPartitionAssignor:
UserData (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint
VersionId |
Code Block |
KafkaConsumer: Subscription => TopicList SubscriptionInfo TopicList => Int32 LatestSupportVersionId => List<String>Int32 SubscriptionInfoClientUUID => Bytes ------------------ StreamsPartitionAssignor: SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint VersionId 128bit PrevTasks => Set<TaskId> StandbyTasks => Int32Set<TaskId> LatestSupportVersionIdEndPoint => Int32 ClientUUID => 128bit HostInfo |
And on assignment:
Code Block |
---|
KafkaConsumer: Assignment = AssignedPartitions AssignmentInfo AssignedPartitions PrevTasks => Set<TaskId> StandbyTasks => Set<TaskId>List<String, List<Int32>> EndPointUserData => HostInfo |
And on assignment:
Code Block |
---|
KafkaConsumer: Assignment = AssignedPartitions AssignmentInfo AssignedPartitions => List<TopicPartition> AssignmentInfo => Bytes ---Bytes ------------------ StreamsPartitionAssignor: AssignmentInfoUserData (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode VersionId => Int32 LatestSupportVersionId => Int32 ActiveTasks => List<TaskId> StandbyTasks => Map<TaskId, Set<TopicPartition>> PartitionsByHost => Map<HostInfo, Set<TopicPartition>> ErrorCode => Int32 |
Streams Sticky TaskAssignor: Stickiness over Balance
Streams' StickyTaskAssignor will honor stickiness over workload balance. More specifically:
- First we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
- Then for each task:
- If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
- Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
- If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
- Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.
Example: Consumer Sticky Assignor
We also have a StickyAssignor provided out of the box trying to mitigate the cost of unnecessary partition migrations. This assignor only relies on subscription metadata but not modifying assignment metadata, as follows:
Code Block |
---|
KafkaConsumer:
Subscription => TopicList SubscriptionInfo
TopicList => List<String>
UserData => Bytes
------------------
StickyAssignor:
UserData (encoded in version 1) => AssignedPartitions
AssignedPartitions => List<String, List<Int32>> |
The goal of this incremental protocol, is to fully leverage on the sticky assignors which will try to reassign partitions to its previous owners in best effort, such that we will revoke less partitions as possible since the revocation process is costlyAs one can see, we honor stickiness (step 2) over workload balance (step 3).
Proposed Changes
We want to separate the protocol improvement into the consumer and streams layer, since streams today embeds a full-fledged consumer instance that hard-code ConsumerCoordinator.
Part I: Incremental Consumer Rebalance Protocol
We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier.
Consumer Protocol
More specifically, we would first inject more metadata at the consumer-layer, as:
Code Block |
---|
KafkaConsumer: Subscription => TopicList AssignedPartitions SubscriptionInfo TopicList => List<String> SubscriptionInfo => Bytes AssignedPartitions => List<String, List<TopicPartition>List<Int32>> // new field Assignment = AssignedPartitions RevokedPartitions AssignmentInfo AssignedPartitions => List<String, List<TopicPartition>List<Int32>> AssignmentInfo => Bytes RevokedPartitions => List<String, List<TopicPartition>List<Int32>> // new field ErrorCode => Int16 // new field |
Note that it is compatible to inject additional fields after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.
Consumer Coordinator Algorithm
Rebalance behavior of the consumer (captured in the consumer coordinator layer) would be changed as follows.
- For every consumer: before sending the join-group request, change the behavior as follows based on the join-group triggering event:
- If received REBALANCE_IN_PROGRESS from heartbeat response: do NOT revoke any partitions; instead just encode the current assigned partitions as part of the Subscription.
- For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
- Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
- Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions. - For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
- For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.
- For every consumer: after received the sync-group request, do the following:
- Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
- Check the error code as well, and depending on it move forward to c) below, or trigger a rebalance immediately (for incompatible members, see below), or fail immediately if it is fatal.
- Update the newly assigned-partitions, and for those newly added partitions, call the rebalance-listener — this is the same as the current logic.
- If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions.
...
NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsAssignor would be updated accordingly in Part II.
Consumer StickyPartitioner
Since we've already encoded the assigned partitions
Compatibility and Upgrade Path
Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner.
...
There's a few edge cases worth mentioning here:
Downgrading and Old-Versioned New Member
If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2. At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above. We will rely on the above consumer-side metric so that users would be notified in time.
Old-Versioned Member Become Leader
Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected.
Compatibility, Deprecation, and Migration Plan
Minimum Version Requirement
This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.
Recommended Upgrade Procedure
As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:
...
In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far, user could choose to take this much easier workaround.
Rejected Alternatives
N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.