Current state: In review
Discussion thread: TBD
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
For stateful applications, one of the biggest performance bottleneck is the state shuffling. In Kafka consumer, there is a concept called "rebalance" which means that for given M partitions and N consumers in one consumer group, Kafka will try to balance the load between consumers and ideally have each consumer dealing with M/N partitions. Broker will also adjust the workload dynamically by monitoring consumers' health so that we could kick dead consumer out of the group, and handling new consumers' join group request. The intuition of this design is to avoid processing hot spot and maintain fairness plus liveness of the whole application. However, when the service state is heavy, a rebalance of one topic partition from instance A to B means huge amount of data transfer. If multiple rebalances are triggered, the whole service could take a very long time to recover due to data transfer.
The idea of this KIP is to reduce number of rebalances by introducing a new concept: static membership. It would help with following example use cases.
Background of consumer rebalance
Right now broker handles consumer state in a two-phase protocol. To solely explain consumer rebalance, we only discuss 3 involving states here: RUNNING, PREPARE_REBALANCE and SYNC.
In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member. To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as static membership, which in contrary to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". Since for many stateful consumer/stream applications, the state shuffling is more painful than short time partial unavailability.
We will be introducing two new terms:
New Configurations
Consumer configs
member.name | The unique identifier of the consumer provided by end user. Default value: empty string. |
The new `member.name` config will be added to the join group request.
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 RebalanceTimeout => int32 MemberId => String MemberName => String // new ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes |
In the meantime, we bump the join group request/response version to v4.
public static Schema[] schemaVersions() { return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4}; } |
public static Schema[] schemaVersions() { return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4}; } |
We are also introducing two new types of return error in JoinGroupResponse V4. Will explain their functionalities in the next section.
MEMBER_ID_MISMATCH(78, "The join group contains member name which is already in the consumer group, however the member id was not matching the record on coordinator", MemeberIdMisMatchException::new), DUPLICATE_STATIC_MEMBER(79, "The join group contains member name which is already in the consumer group, however the member id was missing", DuplicateStaticMemberException::new), |
We shall increase the cap of session timeout to 30 min for relaxing static membership liveness tracking.
val GroupMaxSessionTimeoutMs = 1800000 // 30 min for max cap |
For fault-tolerance, we also include member name within the member metadata to backup in the offset topic.
private val MEMBER_METADATA_V3 = new Schema( new Field(MEMBER_ID_KEY, STRING), new Field(MEMBER_NAME_KEY, STRING), // new new Field(CLIENT_ID_KEY, STRING), new Field(CLIENT_HOST_KEY, STRING), new Field(REBALANCE_TIMEOUT_KEY, INT32), new Field(SESSION_TIMEOUT_KEY, INT32), new Field(SUBSCRIPTION_KEY, BYTES), new Field(ASSIGNMENT_KEY, BYTES)) |
We will define one command line API to help us better manage the static groups:
public static MembershipChangeResult invokeConsumerRebalance(String groupId); public static MembershipChangeResult invokeConsumerRebalance(String groupId, InvokeConsumerRebalanceOptions options); |
In short, the proposed feature is enabled if
On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `member.name` in case client configured it wrong.
For the effectiveness of the KIP, consumer with `member.name` set will not send leave group request when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. In static membership we leverage the consumer group health management to client application such as K8. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go.
On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member name is set, the broker will use the member name specified in the join group request and respond with a unique "member id". Broker will maintain an in-memory mapping of {member.name → member.id} to track member uniqueness. When receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.
For join group requests under static membership (with member name set), we are requiring:
For MEMBER_ID_MISMATCH, we haven't been able to define the possible edge case which could cause this issue. So for the first version, we shall just fail the consumer immediately when it hits this exception.
For DUPLICATE_STATIC_MEMBER, when member name has duplicates, we could refuse join request from members with an outdated member.id (since we update the mapping upon each join group request). In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.
For join group requests under dynamic membership (without member name set), the handling logic will remain unchanged.
If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.
Scale up
We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol level.
We also plan to deprecate group.initial.rebalance.delay.ms since we no longer needs it once static membership is delivered and the incremental rebalancing work is done.
Rolling bounce
Currently there is a config called rebalance timeout which is configured by consumer max.poll.intervals. The reason we set it to poll interval is because consumer could only send request within the call of poll() and we want to wait sufficient time for the join group request. When reaching rebalance timeout, the group will move towards completingRebalance stage and remove unjoined members. This is actually conflicting with the design of static membership, because those temporarily unavailable members will potentially reattempt the join group and trigger extra rebalances. Internally we would optimize this logic by having rebalance timeout only in charge of stopping prepare rebalance stage, without removing non-responsive members immediately. There would not be a full rebalance if the lagging consumer sent a JoinGroup request within the session timeout.
So in summary, the member will only be removed due to session timeout. We shall remove it from both in-memory static member name mapping and member list.
Fault-tolerance of static membership
To make sure we could recover from broker failure/leader transition, an in-memory member name map is not enough. We would reuse the `_consumer_offsets` topic to store the static member map information. When another broker takes over the leadership, we could transfer the mapping together.
Command line API for membership management
forceStaticRebalance (introduced above) will trigger one rebalance immediately on static membership, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). Error will be returned if
We need to enforce special access to these APIs to the end user who may not be in administrative role of Kafka Cluster. We shall allow a similar access level to the join group request, so the consumer service owner could easily use this API.
The fallback logic has been discussed previously. Broker with a lower version would just downgrade static membership towards dynamic membership.
The recommended upgrade process is as follow:
That's it! We believe that the static membership logic is compatible with the current dynamic membership, which means it is allowed to have static members and dynamic members co-exist within the same consumer group. This assumption could be further verified when we do some modeling of the protocol (through TLA maybe) or dev test.
We do have some offline discussions on handling leader rejoin case, for example since the broker could also do the subscription monitoring work, we don't actually need to trigger rebalance on leader side blindly based on its rejoin request. However this is a separate topic and we will address it in another KIP.
In this pull request, we did an experimental approach to materialize member id(the identity given by broker, equivalent to the `member.name` in proposal) on the instance local disk. This approach could reduce the rebalances as expected, which is the experimental foundation of KIP-345. However, KIP-345 has a few advantages over it:
Beyond static membership we could unblock many interactive use cases between broker and consumer. We will initiate separate discussion threads once 345 is done. Examples are: