You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 106 Next »

Status

Current state[Under Discussion]

Discussion thread: TBD

JIRA:  

key summary type created updated due assignee reporter priority status resolution

JQL and issue key arguments for this macro require at least one Jira application link to be configured


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Recently Kafka community is promoting cooperative rebalancing to mitigate the pain points in the stop-the-world rebalancing protocol and an initiation for Kafka Connect already started as KIP-415. There are already exciting discussions around it, but for Kafka Streams, the delayed rebalance is not the complete solution. This KIP is trying to customize the cooperative rebalancing approach specifically for KStream application context,  based on the great design for KConnect.

Currently Kafka Streams uses consumer membership protocol to coordinate the stream task assignment. When we scale up the stream application, KStream group will attempt to revoke active tasks and let the newly spinned hosts take over them. New hosts need to restore assigned tasks' state before transiting to "running". For state heavy application, it is not ideal to give up the tasks immediately once the new player joins the party, instead we should buffer some time to let the new player accept a fair amount of restoring tasks, and finish state reconstruction first before officially taking over the active tasks. Ideally, we could realize no downtime transition during cluster scaling.

In short, the goals of this KIP are:

  • Reduce unnecessary downtime due to task restoration and global application revocation.
  • Better auto scaling experience for KStream applications.
  • Stretch goal: better workload balance across KStream instances.


Background

Consumer Rebalance Protocol: Stop-The-World Effect

As mentioned in motivation, we also want to mitigate the stop-the-world effect of current global rebalance protocol. A quick recap of current rebalance semantics on KStream: when rebalance starts, all stream threads would

  1. Join group with all currently assigned tasks revoked.

  2. Wait until group assignment finish to get assigned tasks and resume working.

  3. Replay the assigned tasks state.

  4. Once all replay jobs finish, stream thread transits to running mode.

The reason for revoking all ongoing tasks is because we need to guarantee each topic partition is assigned with exactly one consumer at any time. In this way, any topic partition could not be re-assigned before it is revoked.

Streams Rebalance Metadata: Remember the PrevTasks

Today Streams embed a full fledged Consumer client, which hard-code a ConsumerCoordinator inside. Streams then injects a StreamsPartitionAssignor to its plugable PartitionAssignor interface and inside the StreamsPartitionAssignor we also have a TaskAssignor interface whose default implementation is StickyPartitionAssignor. Streams partition assignor logic today sites in the latter two classes. Hence the hierarchy today is:

KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> StickyTaskAssignor.


StreamsPartitionAssignor uses the subscription / assignment metadata byte array field to encode additional information for sticky partitions. More specifically on subscription:

KafkaConsumer:


Subscription => TopicList SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes

------------------


StreamsPartitionAssignor:

SubscriptionInfo (encoded in version 4) => VersionId LatestSupportVersionId ClientUUID PrevTasks StandbyTasks EndPoint

   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ClientUUID              => 128bit
   PrevTasks               => Set<TaskId>
   StandbyTasks            => Set<TaskId>
   EndPoint                => HostInfo


And on assignment: 

KafkaConsumer:

Assignment = AssignedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes

------------------

StreamsPartitionAssignor:

AssignmentInfo (encoded in version 4) => VersionId, LatestSupportedVersionId, ActiveTasks, StandbyTasks, PartitionsByHost, ErrorCode
   VersionId               => Int32
   LatestSupportVersionId  => Int32
   ActiveTasks             => List<TaskId>
   StandbyTasks            => Map<TaskId, Set<TopicPartition>>
   PartitionsByHost        => Map<HostInfo, Set<TopicPartition>>
   ErrorCode               => Int32


Streams Sticky TaskAssignor: Stickiness over Balance

Streams' StickyTaskAssignor will honor stickiness over workload balance. More specifically:

  1. First we calculate the average num.tasks each host should get on average as its "capacity", by dividing the total number of num.tasks to the total number of consumers (i.e. num.threads) and then multiple by the number of consumers that host has.
  2. Then for each task:
    1. If it has a client who owns it as its PrevTask, and that client still have capacity assign to it;
    2. Otherwise if it has a client who owns it as its StandbyTask, and that client still have capacity assign to it;
  3. If there are still unassigned tasks after step 2), then we loop over them at the per-sub-topology granularity (for workload balance), and again for each task:
    1. Find the client with the least load, and if there are multiple ones, prefer the one previously owns it, over the one previously owns it as standbyTask, over the one who does not own it at all.

As one can see, we honor stickiness (step 2) over workload balance (step 3).



Proposed Changes

We want to separate the protocol improvement into the consumer and streams layer, since streams today embeds a full-fledged consumer instance that hard-code ConsumerCoordinator.

Part I: Incremental Consumer Rebalance Protocol

We will augment the consumer's rebalance protocol as proposed in Incremental Cooperative Rebalancing: Support and Policies with some tweaks compared to KIP-415. The key idea is that, instead of relying on the single rebalance's synchronization barrier to rebalance the group and hence enforce everyone to give up all the assigned partitions before joining the group as the new generation, we use consecutive rebalances where the end of the first rebalance will actually be used as the synchronization barrier.

Consumer Protocol

More specifically, we would first inject more metadata at the consumer-layer, as:

KafkaConsumer:

Subscription => TopicList AssignedPartitions SubscriptionInfo
   TopicList               => List<String>
   SubscriptionInfo        => Bytes   
   AssignedPartitions      => List<TopicPartition>   // new field


Assignment = AssignedPartitions RevokedPartitions AssignmentInfo
   AssignedPartitions      => List<TopicPartition>
   AssignmentInfo          => Bytes
   RevokedPartitions       => List<TopicPartition>   // new field
   ErrorCode               => Int16                  // new field


Note that it is compatible to inject additional fields after the assignor-specific SubscriptionInfo / AssignmentInfo bytes, since on serialization we would first call assignor to encode the info bytes, and then re-allocate larger buffer to append consumer-specific bytes; with the new protocol, we just need to append some fields before, and some fields (a.k.a. those new fields) after the assignor-specific info bytes, and vice-versa on deserialization. So adding fields after the assignor-bytes is still naturally compatible with the plug-in assignor. However there are indeed some compatibility challenges for the consumer protocol upgrade itself, which we will tackle below.


Consumer Coordinator Algorithm

Rebalance behavior of the consumer (captured in the consumer coordinator layer) would be changed as follows.

  1. For every consumer: before sending the join-group request, do NOT revoke any partitions; instead just encode the current assigned partitions as part of the Subscription.
  2. For the leader: after getting the received subscription topics, as well as the assigned-partitions, do the following:
    1. Call the registered assignor of the selected protocol to generate the assignment; let's call it newly-assigned-partitions.
    2. Segment the total-partitions set of partitions inferred from the newly-assigned-partitions into two exclusive sub-sets: Intersection(total-partitions, assigned-partitions), and Minus(total-partitions, assigned-partitions).
      Note that the latter is possibly non-empty because a partition maybe revoked in previous rebalance and hence not in any assigned partitions, or it is a newly created partition due to add-partitions. Let's call the former prev-assigned-partitions and the latter not-assigned-partitions.
    3. For not-assigned-partitions, we can encode the owner from the newly-assigned-partitions directly since we know no one owns it before, either due to revocation or due to newly created partitions.
    4. For prev-assigned-partitions, check if the owner has changed, if yes, encode it to the old owner in revoked-partitions but NOT encode to the assigned-partitions to the new owner.
  3. For every consumer: after received the sync-group request, do the following:
    1. Check that the newly assigned-partitions is a superset of Minus(assigned-partitions, revoked-partitions). This is because under cooperative rebalance, no partitions should be migrated directly before being revoked first.
    2. Check the error code as well, and depending on it move forward to c) below, or trigger a rebalance immediately (for incompatible members, see below), or fail immediately if it is fatal.
    3. Update the newly assigned-partitions, and for those newly added  partitions, call the rebalance-listener — this is the same as the current logic.
    4. If revoked partitions is not empty, remove those partitions by calling the rebalance-listener. And then immediately send another join group request with the updated assigned partitions.


No changes required from the broker side, since this logic change is completely wrapped inside the consumer protocol / coordinator implementation itself, and to brokers it is just the same as previous version's rebalances.

Note that one minor difference compared with KIP-415 is that we do not introduce the scheduledDelay in the protocol, but instead the consumer will trigger rebalance immediately. This is because the consumer protocol would applies to all consumers (including streams) and hence should be kept simple, and also because KIP-345 is being developed in parallel which is aimed for tackling the scaling out / rolling bounce scenarios already.

We would omit the common scenarios description here since it is already covered in KIP-415, which is very similar to this KIP with the difference of the scheduledDelay above.


NOTE that for this new algorithm to be effective in reducing rebalance costs, it is really expecting the plug-in assignor to be "sticky" in some way, such that the diff of the newly-assigned-partitions and the existing-assigned-partitions can be small, and hence only a few subset of the total number of partitions need to be revoked / migrated at each rebalance in practice – otherwise, we are just paying more rebalance for little benefits. We will talk about how sticky StreamsAssignor would be updated accordingly in Part II.


Compatibility and Upgrade Path

Since we are modifying the consumer protocol as above, we need to design the upgrade path to enable consumers upgrade to the new rebalance protocol in an online manner.

Note that since we are injecting additional fields at the end of the consumer protocol, the new protocol would still be compatible with the old version. That means, an old-versioned consumer would still be able to deserialize a newer-versioned protocol data (as long as we only append new fields at the end, this would be the case).

However, when consumers with V1 is joining the group, there's a key behavioral difference that they would NOT revoke their partitions, and hence it is not safe to re-assign any of their partitions as we did in the current (V0) assignment logic. That means, the leader can only proceed the assignment when it knew that all the members are either on V0, or V1 versions

Another thing to keep in mind that, if the leader itself is still on older version, it would still be able to deserialize the V1 subscription protocol as V0, by ignoring the additional fields, and hence it may "think" everyone is still on V0, while some of them may actually be on the newer version.


Therefore when upgrading we need the new consumer byte-code to first still following the old versioned protocol for both metadata encoding, as well as the behavior (e.g. still revoking before send JoinGroup). And after everyone have upgraded to the new byte-code, we can allow them to start rebalancing with the new versioned protocol. Note that during the later rebalance, it is still possible that consumers will send join-group request with old version (but the key here is that they are all new-version aware), in which case consumer leader can freely adjust its logic based on the aggregated versions. More specifically, we introduce the following new config to Consumer:


Protocol Type
"rebalance.protocol":


type: Enum 
values: {eager, cooperative}
default: eager


When the config value is "eager", the consumer would still use V0 of the consumer protocol as well as the rebalance behavior; if the config value is "cooperative", the consumer will then use the new V1 protocol as well as the new algorithm. Note the difference with KIP-415 here: since on consumer we do not have the luxury to leverage on list of assignors to register multiple protocols and let leader to auto-switch to new versions, we need two rolling bounces instead of one rolling bounce to complete the upgrade, whereas Connect only need one rolling bounce (details below).


We'd update the above algorithm on leader (i.e. bullet point 3) as, such that we will first check the versions of subscription of all the members:

  1. If all members are on V1, then follow the new algorithm.
  2. If all members are on V0, then follow the old algorithm.
  3. If there's no consensus, it means we are in a second rebalance then do the following:
    1. For those members in V1, send assignment in V1 as well by just giving back their existing assigned-partitions from the subscription metadata and leaving the revoked-partitions empty.
    2. For those members in V0, which means they have revoked their partitions but we do not know what are those partitions, we can only given them V0 assignment back with an empty assigned-partitions.


The key point behind this two rolling bounce and the additional check is that, we want to avoid the situation where leader is on old byte-code and only recognize V0, but due to compatibility would still be able to deserialize V1 protocol data from newer versioned members, and hence just go ahead and do the assignment while new versioned members did not revoke their partitions before joining the group.


So under case 3), those members in V0 will have a small window where no partitions are assigned, and some partitions are not assigned to anyone as well. Hence, we will add a new ConsumerCoordinatorMetrics, which client will record upon receiving the assignment with the error code

Protocol Type
"num.incompatible.rebalance": "total number of rebalances that have failed due to incompatible members joining group at the same time."


group: "consumer-coordinator-metrics"
tags: client-id=([-.\w]+)
type: Count

The client, upon receiving the assignment associated with the error-code, would record the above metric, and backoff and re-send join-group request.


As for the upgrade path, we would require users to do two rolling bounces, where:

  1. In the first rolling bounce, keep the rebalance.protocol as "eager" (no need to manually change anything though since it is the default value).
  2. After the first rolling bounce is completely done. Then do a second rolling bounce in which rebalance.protocol is updated to "cooperative". The above logic will make sure that eventually when everyone's sending the join-group request with V1.


Note that this proposal depends on user's correct behavior that everyone should be on the same "rebalance protocol" eventually, otherwise the we would fall into the case 3.b) forever where some partitions would not be assigned to anyone. In addition, this approach assumes that the leader would be V1-aware whenever some V1 subscription is received: again, if users follow the upgrade path above, it should be the case, but if users did not follow the guidance then it may cause undefined behavior since the old versioned leader may just proceed with the V0 "eager" assignment while some of the members are actually on V1.


There's a few edge cases worth mentioning here:

Downgrading and Old-Versioned New Member

If a consumer is downgraded after the above upgrade path is complete, it is treated as first leaving the group, and then rejoining the group as an new member with old V0. This situation can also be reflected when a new member with old version V0 is joining a team (probably mistakenly) that has been completely upgraded to V2. At this moment everyone else will still get their existing assigned-partitions and the new comer would not get anything. However if another member left the group as well, then its partitions would not be assigned to anyone due to the logic 3) above. We will rely on the above consumer-side metric so that users would be notified in time.

Old-Versioned Member Become Leader

Since group coordinator would select new leaders within the existing member, even if the new leader has failed after the group has successfully upgraded the new leader should still be V1-aware, and new members of V0 joining within the same generation should not be selected.

Compatibility, Deprecation, and Migration Plan

Minimum Version Requirement

This change requires Kafka broker version >= 0.9, where broker will react with a rebalance when a normal consumer rejoin the encoded metadata. Client application needs to update to the earliest version which includes KIP-429 version 1.0 change.

Recommended Upgrade Procedure

As we have mentioned above, a new protocol type shall be created. To ensure smooth upgrade, we need to make sure the existing job won't fail. The procedure is like:

  • Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream application to stay with protocol type "consumer".
  • Rolling restart the stream application and the change is automatically applied. This is safe because we are not changing protocol type.

In long term we are proposing a more smooth and elegant upgrade approach than the current one. However it requires broker upgrade which may not be trivial effort for the end user. So far, user could choose to take this much easier workaround.

Rejected Alternatives

N/A for the algorithm part. For implementation plan trade-off, please review the doc in implementation plan.

  • No labels