Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Improve performance of heavy state applications. We have seen that rebalance is the major performance killer with large state application scaling, due to the time wasted in state shuffling.
  2. Improve general rolling bounce performance. For example MirrorMaker processes take a long time to rolling bounce the entire cluster, because one process restart will trigger one rebalance. With the change stated, we only need constant number of rebalance (e.g. for leader restart) for the entire rolling bounce, which will significantly improves the availability of the MirrorMaker pipeline as long as they could restart within the specified timeout.

Public InterfacesBackground of consumer rebalance

Right now broker handles consumer state in a two-phase protocol. To solely explain consumer rebalance, we only discuss 3 involving states here: RUNNING, PREPARE_REBALANCE and SYNC.

  • When a new member joins the consumer group, if this is a new member or the group leader, the broker will move this group state from from RUNNING to PREPARE_REBALANCE. The reason for triggering rebalance when leader joins is because there might be assignment protocol change (for example if the consumer group is using regex subscription and new matching topics show up). If an old member rejoins the group, the state will not change. 
  • When moved to PREPARE_REBALANCE state, the broker will mark first joined consumer as leader, and wait for all the members to rejoin the group. Once we collected enough number of consumers/ reached rebalance timeout, we will reply the leader with current member information and move the state to SYNC. All current members are informed to send SyncGroupRequest to get the final assignment.
  • The leader consumer will decide the assignment and send it back to broker. As last step, broker will announce the new assignment by sending SyncGroupResponse to all the followers. Till now we finished one rebalance and the group generation is incremented by 1.

In the current architecture, during each rebalance consumer groups on broker side will assign new member id with a UUID randomly generated each time. This is to make sure we have unique identity for each group member. During client restart, consumer will send a JoinGroupRequest with a special UNKNOWN_MEMBER id, which has no intention to be treated as an existing member.  To make the KIP work, we need to change both client side and server side logic to make sure we persist member identity throughout restarts, which means we could reduce number of rebalances since we are able to apply the same assignment based on member identities. The idea is summarized as as static membership, which in contrary to to dynamic membership (the one our system currently uses), is prioritizing "state persistence" over "liveness". Since for many stateful consumer/stream applications, the state shuffling is more painful than short time partial unavailability.

...

We will be introducing two new terms:

...

  • Static Membership: the membership protocol where the consumer group will not trigger rebalance unless 1. a new member joins 2. a leader rejoins. 3. an existing member go offline over session timeout.
  • Member name: the unique identifier defined by user to distinguish each client instance.

Public Interfaces

Client

...

side changes

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). To distinguish between previous version of protocol, we will also increase the join group request version to v4 when MEMBER_NAME is set. Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate member.name in case client configured it wrong.

Code Block
languagejava
titleConsumerConfig.java
public static final STRING MEMBER_NAME = "member_A"; // default empty String


Changes will be applied to join group request to include member name: 

Code Block
JoinGroupRequest => GroupId SessionTimeout RebalanceTimeout MemberId MemberName ProtocolType GroupProtocols
  GroupId             => String
  SessionTimeout      => int32
  RebalanceTimeout	  => int32
  MemberId            => String
  MemberName   		  => String // new
  ProtocolType        => String
  GroupProtocols      => [Protocol MemberMetadata]
  Protocol            => String
  MemberMetadata      => bytes


If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.

...

We shall also bump the join group request/response version to v4.

Code Block
languagejava
titleJoinGroupRequest.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2, JOIN_GROUP_REQUEST_V3, JOIN_GROUP_REQUEST_V4};
}


Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}



Proposed Changes


For the effectiveness of the KIP, consumer with member.name set will not send leave group request when they go offline. It is because the proposed rebalance protocol will trigger rebalance with this intermittent in-and-out which is not ideal. 

...