You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 41 Next »



Status

Current state: In review

Discussion thread: TBD

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is a concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health so that we could kick dead consumer out of the group, and handling new consumers' join group request. The intuition of this design is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one topic partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover due to data transfer. 

The idea of this KIP is to reduce number of rebalances by introducing a new concept: static membership. Core argument is: Heavy state applications should reduce state shuffling as much as possible

Public Interfaces

Right now broker handles consumer state in a two-phase protocol. To solely explain consumer rebalance, we only discuss 3 involving states here: RUNNING, PREPARE_REBALANCE and SYNC.

  • When a new member joins the consumer group, if this is a new member or the group leader, the broker will move this group state from RUNNING to PREPARE_REBALANCE. The reason for triggering rebalance when leader joins is because there might be assignment protocol change (for example if the consumer group is using regex subscription and new matching topics show up). If an old member rejoins the group, the state will not change. 
  • When moved to PREPARE_REBALANCE state, the broker will mark first joined consumer as leader, and wait for all the members to rejoin the group. Once we collected enough number of consumers/ reached rebalance timeout, we will reply the leader with current member information and move the state to SYNC. All current members are informed to send SyncGroupRequest to get the final assignment.
  • The leader consumer will decide the assignment and send it back to broker. As last step, broker will announce the new assignment by sending SyncGroupResponse to all the followers. Till now we finished one rebalance and the group generation is incremented by 1.

In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as static membership, which in contrary to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". Since for many stateful consumer/stream applications, the state shuffling is more painful than short time partial unavailability.

Proposed Changes

We will be introducing a bunch of new terms:

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new member joins 2. a leader rejoins. 3. an existing member go offline over certain timeout.
  • Member name: the unique identifier defined by user to distinguish each client instance.
  • Member registration timeout: the max time we could tolerate a static member to go offline.
  • Member expansion timeout: the max time we will wait since we receive a new static member join request.

Client behavior changes

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_NAME is set. Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.

ConsumerConfig.java
public static final STRING MEMBER_NAME = "member_A"; // default empty String

Changes will be applied to join group request and offset commit request to include member name and other settings: 

JoinGroupRequest => 
 GroupId => String
 SESSION_TIMEOUT => INT32
 REBALANCE_TIMEOUT => INT32
 STATIC_REGISTRATION_TIMEOUT => INT32 // NEW
 STATIC_EXPANSION_TIMEOUT => INT32  // NEW
 MEMBER_NAME => INT32 // NEW
 PROTOCOL_TYPE => STRING
 GROUP_PROTOCOLS => ARRAY
JoinGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}

If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.

We shall also bump the join group response version to v4.

JoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}


On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member name is set, the broker will use the member name specified in the join group request and respond with a unique "member id".  Broker will maintain an in-memory mapping of {member.name → member.id} to track member uniqueness.

For commit requests under static membership, we are requiring:

  • Both member.name and member.id must be set. Otherwise reply NO_STATIC_MEMBER_INFO_SET 
  • Member.name and member.id mapping are aligned with coordinator cache. Otherwise reply DUPLICATE_STATIC_MEMBER

so that when member name has duplicates, we could refuse commit request from members with an outdated member.id (since we update the mapping upon each join group request). Normally when hitting NO_STATIC_MEMBER_INFO_SET, it could potentially due to the consumer is doing rolling restart, where some members haven't updated their code with new member name. In an edge case where the client hits DUPLICATE_STATIC_MEMBER exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce corner cases easily.


Also notice that we have a conflicting internal config called LEAVE_GROUP_ON_CLOSE_CONFIG which decides whether a consumer should send a leave group request upon going offline. This would make the effectiveness of this KIP less because after leaving the consumer group, the broker will identify the same member as a new member which would still trigger a lot of rebalances. We will set this internal config default to false if enabling with member name.

ConsumerConfig.java
.defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG,
                Type.BOOLEAN,
                false, // if on static membership
                Importance.LOW)


When do we rebalance in static membership?

Rebalance happens rarely in static membership (unless scale up/down or leader rejoins). When receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

There are two configs to control the case when we must do a rebalance: registration timeout and expansion timeout.

Scale down

ConsumerConfig.java
public static final STRING REGISTRATION_TIMEOUT_MS = "registration.timeout.ms";

Registration timeout is the timeout we will trigger rebalance when a member goes offline for too long. It should usually be set much larger than session timeout which is used to detect consumer health. It is monitored through heartbeat the same way as session timeout, and will replace the session timeout in a static membership. The reason we define a different config is because we would like easy switch between dynamic membership and static membership (See details here). By setting it to 15 ~ 30 minutes, we are loosening the track of static member progress, and transfer the member management to client application like K8. Of course, we should not wait forever for the member to back online simply for the purpose of reducing rebalances. Eventually the member will be kicked out of group and a final rebalance is triggered. Note that we are tracking the earliest offline member and compare with the registration timeout. Example below with registration timeout 15 min:

EventTimeearliest timeAction
Member A dropped00:00 00:00 N/A

Member B dropped

00:10 00:00N/A
Member A back online 00:14 00:10N/A
B gone for too long00:25 00:10Rebalance 

There are cases when we are scaling down the application, it is advised to do it quickly so that when the registration timeout is reached since the first gone member, we could trigger one single rebalance and make the progress back on track. Note that here we are sacrificing liveness for x min of registration timeout for the sake of minimizing state shuffling. This is a reasonable trade-off for large state applications.

A corner case is that A & B could be dropping off the group at near time. In static membership, we still need to sync group to make sure we keep track of how many existing members are still alive, otherwise unnecessary rebalance will trigger later. 

Removing members are tricky in nature. For broker the information of the "target scale down" is very hard to get, for example if we have 16 members and we want to cut the number by half, during the group shrink 16 → 8 it is unknown to the broker coordinator when to trigger rebalance. An admin API to force rebalance could be helpful here, but we will make a call once we finished the major implementation.

Scale up

Adding new static memberships should be straightforward. This operation should be happening fast enough (to make sure capacity could catch up quickly), we are defining another config called expansion timeout.

ConsumerConfig.java
public static final STRING EXPANSION_TIMEOUT_MS = "expansion.timeout.ms";

This is the timeout when we count down to trigger exactly one rebalance (i.e, the time estimate to spin up # of hosts) since the first joined member's request. It is advised to be set roughly the same with session timeout to make sure the workload become balanced when you 2X or 3X your stream job. Example with expansion timeout 5 min: 

EventTimecount timeAction
New member A join00:00 00:00 N/A

New member B join

00:03 00:00N/A
Expansion timeout00:05N/ARebalance
New member C join00:0600:06N/A
Expansion timeout00:11N/ARebalance

In this example unfortunately, we triggered two rebalances, because C is too late to catch first round expansion timeout. When C finally joins, it triggers the counter of expansion timeout. After 5 min, another rebalance kicks in and assign new tasks to C. 

In KStream the scale up is difficult since we need to shuffle the state to new hosts. In ideal case, we could actually introduce a new status called "learner" where the newly up hosts could try to catch up with the assigned task progress first before triggering the rebalance, from which we don't see a sudden dip on the progress. However, it is built on top of the success of KIP-345 and could be addressed in a separate KIP specifically for KStream.

Effectively, we are using expansion timeout to replace rebalance timeout, which is configured by max.poll.intervals from client side, and using registration timeout to replace session timeout.

Fault-tolerance of static membership

To make sure we could recover from broker failure/leader transition, an in-memory member name map is not enough. We will init another topic called `static_member_map` in the cluster and each time we have a stable state we should write the complete mapping information as a single message into it. We could otherwise reuse the `_consumer_offsets` topic to store this information, which however could be unfavorable if we are making too many changes to existing offset message schema. Either way when another broker takes over the leadership, we could transfer the mapping together.

Switch between static and dynamic membership

For the very first version, we hope to make transferring logic simple enough. As we have mentioned, the goal of static membership is to reduce multiple rebalances with intermittent failures. There should be an easy way to fallback to dynamic membership when user prefers to let broker handle task assignments, for example liveness becomes a more important factor when the state tuning has big progress. The fallback is simple: when the membership becomes stable, the first join group request will decide the membership protocol for next round. For example, when we are running stable with static membership, deploy a new change to the client app without member.name being set could invalidate the current hashmap on broker, and all the v4 request containing member.name will be treated as a normal dynamic member and perform the sticky assignment in dynamic context as usual. Note that we intentionally separate session.timeout and registration.timeout so user doesn't have to change any setting except member name, although they are serving the same purpose in different protocols.




Compatibility, Deprecation, and Migration Plan

  • The fallback logic has been discussed previously. Broker with a lower version would just downgrade static membership towards dynamic membership.

Non-goal

We do have some offline discussions on handling leader rejoin case, for example since the broker could also do the subscription monitoring work, we don't actually need to trigger rebalance on leader side blindly based on its rejoin request. However this is a separate topic and we will address it in another KIP. 

Rejected Alternatives

In this pull request, we did an experimental approach to materialize member id on the instance local disk. This approach could reduce the rebalances as expected, which is the experimental foundation of KIP-345. However, KIP-345 has a few advantages over it:

  1. It gives users more control of their member id string; this would help for debugging purposes.
  2. It is more cloud-/k8s-and-alike-friendly: when we move an instance from one container to another, we can copy the member id to the config files.
  3. It doe not require the consumer to be able to access another dir on the local disks (think your consumers are deployed on AWS with remote disks mounted).
  4. By allowing consumers to optionally specifying a member id, this rebalance benefit can be easily migrated to connect and streams as well which relies on consumers, even in a cloud environment.



  • No labels