...
Proposed Changes
We will be introducing a bunch of two new terms:
- Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new member joins 2. a leader rejoins. 3. an existing member go offline over certain session timeout.
- Member name: the unique identifier defined by user to distinguish each client instance.
- Member registration timeout: the max time we could tolerate a static member to go offline.
- Member expansion timeout: the max time we will wait since we receive a new static member join request.
Client behavior changes
On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_NAME is set. Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.
...
Changes will be applied to join group request, and sync group request to include member name and other settings:
Code Block |
---|
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols GroupId => String SessionTimeout => int32 RebalanceTimeout => int32 MemberId => String MemberName => String // new ProtocolType => String GroupProtocols => [Protocol MemberMetadata] Protocol => String MemberMetadata => bytes SyncGroupRequest => GroupId GroupGenerationId MemberId MemberName GroupId => String GroupGenerationId => int32 GroupState => [MemberId MemberState] MemberId => String MemberName => String // new MemberState => bytes |
If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.
...
For scaling up from empty stage, we plan to deprecate group.initial.rebalance.delay.ms since since we no longer needs it once the incremental rebalancing work is done.
Rolling bounce
Currently there is a config called rebalance timeout which is configured by consumer max.poll.intervals. The reason we set it to poll interval is because consumer could only send request within the call of poll() and we want to wait sufficient time for the join group request. When reaching rebalance timeout, the group will move towards completingRebalance stage and remove unjoined groups. This is actually conflicting with the design of static membership, because those temporarily unavailable members will potentially reattempt the join group and trigger extra rebalances. Internally we would optimize this logic by having rebalance timeout only in charge of stopping prepare rebalance stage, without removing non-responsive members immediately.
Fault-tolerance of static membership
...
That's it! We believe that the static membership logic is compatible with the current dynamic onemembership, which means it is allowed to have static members and dynamic members co-exist within the same consumer group. This assumption could be further verified when we do some modeling of the protocol (through TLA maybe) or dev test.
...
forceStaticRebalance will trigger one rebalance immediately on static membership, which is mainly used for fast scale down cases (we already detect consumer failure and wants to start rebalance immediately without waiting for faster than the session timeout). Error will be returned if if
- the broker is on an old version.
- if the group is preparing rebalance/completing rebalance.
- group has dynamic members (without member name).
- other potential failure cases.
...