Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleJoinGroupResponse.java
public static Schema[] schemaVersions() {
    return new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2, JOIN_GROUP_RESPONSE_V3, JOIN_GROUP_RESPONSE_V4};
}

We are also introducing two a new types type of return error in JoinGroupResponse V4. Will explain their functionalities in the next section.

Code Block
languagejava
titleErrors.java
MEMBER_ID_MISMATCH(78, "The join group contains member name which is already in the consumer group, however the member id was not matching the record on coordinator",
        MemeberIdMisMatchException::new),
DUPLICATE_STATIC_MEMBER(79, "The join group contains member name which is already in the consumer group, however the member id was missing",
        DuplicateStaticMemberException::new),

...

Server side changes

We shall increase the cap of session timeout to 30 min for relaxing We shall increase the cap of session timeout to 30 min for relaxing static membership liveness tracking.

...

Code Block
languagescala
titleGroupMetadataManager
private val MEMBER_METADATA_V3 = new Schema(
  new Field(MEMBER_ID_KEY, STRING),
  new Field(MEMBER_NAME_KEY, STRING), // new
  new Field(CLIENT_ID_KEY, STRING),
  new Field(CLIENT_HOST_KEY, STRING),
  new Field(REBALANCE_TIMEOUT_KEY, INT32),
  new Field(SESSION_TIMEOUT_KEY, INT32),
  new Field(SUBSCRIPTION_KEY, BYTES),
  new Field(ASSIGNMENT_KEY, BYTES))

Command line API and Scripts

We will define one command line API to help us better manage the static consumer groups:

Code Block
titleAdminClient.java
public static MembershipChangeResult invokeConsumerRebalance(String groupId);
public static MembershipChangeResult invokeConsumerRebalance(String groupId, InvokeConsumerRebalanceOptions options);

...

In the meantime we shall introduce a type of request/response to make the API effective:

Code Block
titleConsumerRebalanceRequest.java
public static MembershipChangeResult invokeConsumerRebalance(String groupId);
public static MembershipChangeResult invokeConsumerRebalance(String groupId, InvokeConsumerRebalanceOptions options);


Code Block
ConsumerRebalanceRequest => GroupId
  GroupId             => String
ConsumerRebalanceResponse => GroupId, ErrorCode, ErrorMessage
  GroupId             => String
  ErrorCode 		  => Int16
  ErrorMessage	      => String


A script called kafka-invoke-consumer-rebalance.sh will be added for end user to easily manipulate the consumer group.

./bin/kafka-invoke-consumer-rebalance.sh --zookeeper localhost:2181 --broker 1 --group-id  group-1 will immediately trigger a consumer group rebalance by transiting group state to PREPARE_REBALANCE. (explanation in next section.)

Proposed Changes

In short, the proposed feature is enabled if 

  1. JoinGroupRequest V4 is supported on both client and broker
  2. `member.name` is configured with non-empty string.

Client behavior changes

In short, the proposed feature is enabled if 

  1. JoinGroupRequest V4 is supported on both client and broker
  2. `member.name` is configured with non-empty string.

Client behavior changes

On client side, we add a new config called MEMBER_NAME in ConsumerConfig. On consumer service init, if the MEMBER_NAME config is set, we will put it in the initial join group request to identify itself as a static member (static membership); otherwise, we will still send UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID (dynamic membership). Note that it is user's responsibility to assign unique member id for each consumers. This could be in service discovery hostname, unique IP address, etc. We also have logic handling duplicate `member.name` in case client configured it wrong.

...

For join group requests under static membership (with member name set), we are requiring:

  • If the `member.id` uses Member.id must be set or use UNKNOWN_MEMBER_NAME, we shall return the member id stored in the current map if `member.name` is known. Also once we are done with KIP-394, all the join group requests are requiring `member.id` to physically enter the consumer group. 
  • we are requiring membername` is already within the map. The set member.id (if not unknown) has to match the given value. Otherwise reply MEMBER_ID_MISMATCH
  • Member.id must be left empty if the `member.name` is new. Otherwise reply DUPLICATE_STATIC_MEMBER

For MEMBER_ID_MISMATCH, we haven't been able to define the possible edge case which could cause this issue. So for the first version, we shall just fail the consumer immediately when it hits this exception.

...

  • value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge case that if we could have members with the same `member.name` (for example mis-configured instances). When member name has duplicates, we could refuse join request from members with an outdated

...

  • `member.

...

  • id` (since we update the mapping upon each join group request). In an edge case where the client hits this exception in the response, it is suggesting that some other consumer takes its spot. The client should immediately fail itself to inform end user that there is a configuration bug which is generating duplicate consumers with same identity. For first version of this KIP, we just want to have straightforward handling to expose the error in early stage and reproduce bug cases easily.

For join group requests under dynamic membership (without member name set), the handling logic will remain unchanged. If the broker version is not the latest (< v4), the join group request shall be downgraded to v3 without setting the member Id.

...

Command line API for membership management

forceStaticRebalanceInvokeConsumerRebalance (introduced above) will trigger one rebalance immediately on static membership, which is mainly used for fast scale down/host replacement cases (we detect consumer failure faster than the session timeout). This API will send a  request to group coordinator, and the coordinator will decide whether to take this metadata change request based on its status at time.

Error will be returned if

  1. the broker is on an old version.
  2. Consumer group does not exist.
  3. Operator is not authorized. (Neither admin nor consumer group creater)
  4. if the group is preparing rebalance/completing rebalance.group has dynamic members (without member name).not in a valid state to transit to rebalance. (use `canRebalance` function defined in GroupMetadata.scala to check)
  5. other potential failure cases.

We need to enforce special access to these APIs to the end user who may not be in administrative role of Kafka Cluster. We shall allow a similar access level to the join group request, so the consumer service owner could easily use this API. Another detail to take care is that we need to automatically take the hash of group id so that we know which broker to send this request to.

Compatibility, Deprecation, and Migration Plan

...