Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new member joins 2. a leader rejoins. 3. an existing member go offline over session timeout.
  • Member name: the unique identifier defined by user to distinguish each client instance.


Public Interfaces

Client side changes

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_NAME is set. Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.

Code Block
languagejava
titleConsumerConfig.java
public static final STRING MEMBER_NAME = "member_A"; // default empty String

New Configurations

Consumer configs

member.name

The unique identifier of the consumer provided by end user.

Default value: empty string.

Client side changes

The new "member.name" config will be added to the join group request.Changes will be applied to join group request to include member name: 

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  MemberName   		  => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes

If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.


We shall also In the meantime, we bump the join group request/response version to v4.

Code Block
languagejava
titleJoinGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}


Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}

Server side changes

We shall increase the cap of session timeout to 30 min for relaxing static membership liveness tracking.

Code Block
languagescala
titleKafkaConfig.scala
val GroupMaxSessionTimeoutMs = 1800000 // 30 min for max cap

For fault-tolerance, we also include member name within the member metadata to backup in the offset topic.

Code Block
languagescala
titleGroupMetadataManager
private val MEMBER_METADATA_V3 = new Schema(
  new Field(MEMBER_ID_KEY, STRING),
  new Field(MEMBER_NAME_KEY, STRING), // new
  new Field(CLIENT_ID_KEY, STRING),
  new Field(CLIENT_HOST_KEY, STRING),
  new Field(REBALANCE_TIMEOUT_KEY, INT32),
  new Field(SESSION_TIMEOUT_KEY, INT32),
  new Field(SUBSCRIPTION_KEY, BYTES),
  new Field(ASSIGNMENT_KEY, BYTES))

We will define one command line API to help us better manage the static groups:

Code Block
titleAdminClient.java
public static MembershipChangeResult forceStaticRebalance(String groupId)


Proposed Changes

Client behavior changes

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.

For the effectiveness of the KIP, consumer

...

with member.name

...

 set will not send leave group request

...

 when they go offline, which means we shall only rely on session.timeout to trigger group rebalance. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal

...

. In static membership we leverage the consumer group health management to client application such as K8. Therefore, it is also advised to make the session timeout large enough so that broker side will not trigger rebalance too frequently due to member come and go.

Server behavior changes

On server side, broker will keep handling join group request <= v3 as before. If the protocol version is upgraded to v4 and the member name is set, the broker will use the member name specified in the join group request and respond with a unique "member id".  Broker will maintain an in-memory mapping of {member.name → member.id} to track member uniquenessWhen receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.



For join group requests under static membership (with member name set), we are requiring:

  • Member.id must be set if the member.name is already within the map. Otherwise reply MISSING_MEMBER_ID 
  • Member.id must be left empty if the member.name is new. Otherwise reply DUPLICATEreply DUPLICATE_STATIC_MEMBER

so that when member name has duplicates, we could refuse join request from members with an outdated member.id (since we update the mapping upon each join group request). In  In an edge case where the client hits DUPLICATEhits DUPLICATE_STATIC_MEMBER exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce corner bug cases easily.

When do we rebalance in static membership?

Rebalance happens rarely in static membership (unless scale up/down or leader rejoins). When receiving an existing member's rejoin request, broker will return the cached assignment back to the member, without doing any rebalance.

Scale down

Session timeout is the timeout we will trigger rebalance when a member goes offline for too long (not sending heartbeat request). To make static membership effective, we should increase the default max session timeout to 30 min so that end user could config it freely. Although in static membership we are leveraging client applications' self management (for example K8), session timeout could serve as the last line of defense for liveness.

...

languagescala
titleKafkaConfig.scala

...

If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.


Scale up

We will not plan to solve the scale up issue holistically within this KIP, since there is a parallel discussion about Incremental Cooperative Rebalancing, in which we will encode the "when to rebalance" logic at the application level, instead of at the protocol levelAdding new static memberships should be straightforward. This operation should be happening fast enough (to make sure capacity could catch up quickly), we are defining another config called expansion timeout. In ideal case, we could actually introduce a new status called "learner" where the newly up hosts could try to catch up with the assigned task progress first before triggering the rebalance, from which we don't see a sudden dip on the progress

For scaling up from empty stage, we plan to deprecate group.initial.rebalance.delay.ms since we no longer needs it once the incremental rebalancing work is done.

...

Currently there is a config called rebalance timeout which is configured by consumer max.poll.intervals. The reason we set it to poll interval is because consumer could only send request within the call of poll() and we want to wait sufficient time for the join group request. When reaching rebalance timeout, the group will move towards completingRebalance stage and remove unjoined groups. This is actually conflicting with the design of static membership, because those temporarily unavailable members will potentially reattempt the join group and trigger extra rebalances. Internally we would optimize this logic by having rebalance timeout only in charge of stopping prepare rebalance stage, without removing non-responsive members immediately.


Fault-tolerance of static

...

membership 

To make sure we could recover from broker failure/leader transition, an in-memory member name map is not enough. We would reuse the `_consumer_offsets` topic to store the static member map information. When another broker takes over the leadership, we could transfer the mapping together. We shall also introduce new member metadata format:

Code Block
languagescala
titleGroupMetadataManager
private val MEMBER_METADATA_V3 = new Schema(
  new Field(MEMBER_ID_KEY, STRING),
  new Field(MEMBER_NAME_KEY, STRING), // new
  new Field(CLIENT_ID_KEY, STRING),
  new Field(CLIENT_HOST_KEY, STRING),
  new Field(REBALANCE_TIMEOUT_KEY, INT32),
  new Field(SESSION_TIMEOUT_KEY, INT32),
  new Field(SUBSCRIPTION_KEY, BYTES),
  new Field(ASSIGNMENT_KEY, BYTES))

...

 


Command

...

The recommended upgrade process is as follow:

  1. Upgrade your broker to include this KIP-change.
  2. Rolling bounce your consumer group to set member name and session timeout to a reasonable number.

That's it! We believe that the static membership logic is compatible with the current dynamic membership, which means it is allowed to have static members and dynamic members co-exist within the same consumer group. This assumption could be further verified when we do some modeling of the protocol (through TLA maybe) or dev test. 

...

line API for membership management

For the very first version, We will define one command line API to help us better manage the groups:

Code Block
titleAdminClient.java
public static MembershipChangeResult forceStaticRebalance(String groupId)

forceStaticRebalance forceStaticRebalance (introduced above) will trigger one rebalance immediately on static membership, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). Error will be returned if

  1. the broker is on an old version.
  2. if the group is preparing rebalance/completing rebalance.
  3. group has dynamic members (without member name).
  4. other potential failure cases.

We need to enforce special access to these APIs to the end user who may not be in administrative role of Kafka Cluster. We shall allow a similar access level to the join group request, so the consumer service owner could easily use this API.

Upgrade from dynamic membership to static membership

The recommended upgrade process is as follow:

  1. Upgrade your broker to include this KIP-change.
  2. Rolling bounce your consumer group to set member name and session timeout to a reasonable number.

That's it! We believe that the static membership logic is compatible with the current dynamic membership, which means it is allowed to have static members and dynamic members co-exist within the same consumer group. This assumption could be further verified when we do some modeling of the protocol (through TLA maybe) or dev test. 

Compatibility, Deprecation, and Migration Plan

...