Current state: In review
Discussion thread: TBD
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is a concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health so that we could kick dead consumer out of the group, and handling new consumers' join group request. The intuition of this design is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one topic partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover due to data transfer.
The idea of this KIP is to reduce number of rebalances by introducing a new concept: static membership. Core argument is: Heavy state applications should reduce state shuffling as much as possible.
Public Interfaces
Right now broker handles consumer state in a two-phase protocol. To solely explain consumer rebalance, we only discuss 3 involving states here: RUNNING, PREPARE_REBALANCE and SYNC.
In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member. To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as static membership, which in contrary to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". Since for many stateful consumer/stream applications, the state shuffling is more painful than short time partial unavailability.
We will be introducing a bunch of new terms:
On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_NAME is set. Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.
public static final STRING MEMBER_NAME = "member_A"; // default empty String |
Changes will be applied to join group request, sync group request and offset commit request to include member name and other settings:
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 RebalanceTimeout => int32 MemberId => String MemberName => String // new ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes SyncGroupRequest => GroupId GroupGenerationId MemberId MemberName GroupId => String GroupGenerationId => int32 GroupState => [MemberId MemberState] MemberId => String MemberName => String // new MemberState => bytes OffsetCommitRequest => GroupId GenerationId MemberId MemberName Topics GroupId => String GenerationId => String MemberId => String MemberName => String // new Topics => partition data |
If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.
We shall also bump the join group request/response version to v4.
public static Schema[] schemaVersions() { return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4}; } |
public static Schema[] schemaVersions() { return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4}; } |
For the effectiveness of the KIP, consumer with member.name set will not send leave group request when they go offline. It is because the current rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal.
On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member name is set, the broker will use the member name specified in the join group request and respond with a unique "member id". Broker will maintain an in-memory mapping of {member.name → member.id} to track member uniqueness.
For commit requests under static membership, we are requiring:
so that when member name has duplicates, we could refuse commit request from members with an outdated member.id (since we update the mapping upon each join group request). Normally when hitting NO_STATIC_MEMBER_INFO_SET, it could potentially due to the consumer is doing rolling restart, where some members haven't updated their code with new member name. In an edge case where the client hits DUPLICATE_STATIC_MEMBER exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce corner cases easily.
Rebalance happens rarely in static membership (unless scale up/down or leader rejoins). When receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.
Scale down
Session timeout is the timeout we will trigger rebalance when a member goes offline for too long (not sending heartbeat request). To make static membership effective, we should increase the default max session timeout to 30 min so that end user could config it freely. Although in static membership we are leveraging client applications' self management (for example K8), session timeout could serve as the last line of defense for liveness.
val GroupMaxSessionTimeoutMs = 1800000 // 30 min for max cap |
Scale up
We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol level.
Adding new static memberships should be straightforward. This operation should be happening fast enough (to make sure capacity could catch up quickly), we are defining another config called expansion timeout. In ideal case, we could actually introduce a new status called "learner" where the newly up hosts could try to catch up with the assigned task progress first before triggering the rebalance, from which we don't see a sudden dip on the progress.
With the introduction of static membership, we plan to deprecate
def expansionTimeoutMs = Int // Default 5 min |
This is the timeout when we count down to trigger exactly one rebalance (i.e, the time estimate to spin up # of hosts) since the first joined member's request. It is advised to be set roughly the same with session timeout to make sure the workload become balanced when you 2X or 3X your stream job. Example with expansion timeout 5 min:
Effectively, we are using expansion timeout to replace rebalance timeout, which is configured by max.poll.intervals from client side, and using registration timeout to replace session timeout. We are also replacing group.initial.rebalance.delay.ms config on broker side to make sure the behavior is consistent for static membership.
To make sure we could recover from broker failure/leader transition, an in-memory member name map is not enough. We would reuse the `_consumer_offsets` topic to store the static member map information. When another broker takes over the leadership, we could transfer the mapping together. We shall also introduce new member metadata format:
private val MEMBER_METADATA_V3 = new Schema( new Field(MEMBER_ID_KEY, STRING), new Field(MEMBER_NAME_KEY, STRING), // new new Field(CLIENT_ID_KEY, STRING), new Field(CLIENT_HOST_KEY, STRING), new Field(REBALANCE_TIMEOUT_KEY, INT32), new Field(SESSION_TIMEOUT_KEY, INT32), new Field(SUBSCRIPTION_KEY, BYTES), new Field(ASSIGNMENT_KEY, BYTES)) |
For the very first version, we hope to make membership transferring logic and human handling simple enough. We will define three command line APIs to help us better manage the groups:
public static MembershipChangeResult enableStaticMembership(String groupId, int registrationTimeout, int expansionTimeout) public static MembershipChangeResult enableDynamicMembership(String groupId) public static MembershipChangeResult forceStaticRebalance(String groupId) |
enableStaticMembership will change the consumer group to static membership, along with changing registration timeout and expansion timeout. After that, all the joining members are required to set the member name field. Error will be returned if
Note that the client should already include member name field at this point. User could also use this API to change the timeout configs as they want, or leave it blank to use default value.
enableDynamicMembership will in the contrary just change the membership back to dynamic mode. Error will be returned if
forceStaticRebalance will trigger one rebalance immediately on static membership, which is mainly used for fast scale up/down cases. Error will be returned if
We need to enforce special access to these APIs to the end user who may not be in administrative role of Kafka Cluster. This involves defining some customized Kafka principle.
We do have some offline discussions on handling leader rejoin case, for example since the broker could also do the subscription monitoring work, we don't actually need to trigger rebalance on leader side blindly based on its rejoin request. However this is a separate topic and we will address it in another KIP.
In this pull request, we did an experimental approach to materialize member id on the instance local disk. This approach could reduce the rebalances as expected, which is the experimental foundation of KIP-345. However, KIP-345 has a few advantages over it: