Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When a consumer leave the consumer group, it logs the reason. However, there is not information on the broker side which makes troubleshooting rebalances triggered by LeaveGroupRequest hard. Propagating the reason to the broker via the LeaveGroupRequest would be really helpful in such situations.

Public Interfaces

Protocol

A new field called Reason is added to each member in the JoinGroupRequest

Code Block
languagejs
{
  "apiKey": 11,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "JoinGroupRequest",
  // Version 1 adds RebalanceTimeoutMs.
  //
  // Version 2 and 3 are the same as version 1.
  //
  // Starting from version 4, the client needs to issue a second request to join group
  //
  // Starting from version 5, we add a new field called groupInstanceId to indicate member identity across restarts.
  // with assigned id.
  //
  // Version 6 is the first flexible version.
  //
  // Version 7 is the same as version 6.
  //
  // Version 8 adds the Reason field (KIP-800).
  "validVersions": "0-8",
  "flexibleVersions": "6+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "SessionTimeoutMs", "type": "int32", "versions": "0+",
      "about": "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds." },
    // Note: if RebalanceTimeoutMs is not present, SessionTimeoutMs should be
    // used instead.  The default of -1 here is just intended as a placeholder.
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
      "about": "The maximum time in milliseconds that the coordinator will wait for each member to rejoin when rebalancing the group." },
    { "name": "MemberId", "type": "string", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "GroupInstanceId", "type": "string", "versions": "5+", 
      "nullableVersions": "5+", "default": "null",
      "about": "The unique identifier of the consumer instance provided by end user." },
    { "name": "ProtocolType", "type": "string", "versions": "0+",
      "about": "The unique name the for class of protocols implemented by the group we want to join." },
    { "name": "Protocols", "type": "[]JoinGroupRequestProtocol", "versions": "0+",
      "about": "The list of protocols that the member supports.", "fields": [
      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
        "about": "The protocol name." },
      { "name": "Metadata", "type": "bytes", "versions": "0+",
        "about": "The protocol metadata." }
    ]},
    // New Field //
    { "name": "Reason", "type": "string",
      "versions": "8+", "nullableVersions": "8+","default": "null",
      "about": "The reason why the member (re-)joins the group." }
  ]
}


A new A tagged field called Reason is added to each member in the LeaveGroupRequest

Code Block
languagejs
{
  "apiKey": 13,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "LeaveGroupRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 defines batch processing scheme with group.instance.id + member.id for identity
  //
  // Version 4 is the first flexible version.
  //
  // Version 5 adds the Reason field (KIP-800).  
  "validVersions": "0-45",
  "flexibleVersions": "4+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The ID of the group to leave." },
    { "name": "MemberId", "type": "string", "versions": "0-2",
      "about": "The member ID to remove from the group." },
    { "name": "Members", "type": "[]MemberIdentity", "versions": "3+",
      "about": "List of leaving member identities.", "fields": [
      { "name": "MemberId", "type": "string", "versions": "3+",
        "about": "The member ID to remove from the group." },
      { "name": "GroupInstanceId", "type": "string",
        "versions": "3+", "nullableVersions": "3+", "default": "null",
        "about": "The group instance ID to remove from the group." },
      // New Field //
      { "name": "Reason", "type": "string", "versions": "4+",
        "nullableVersions
        "versions": "4+", "taggedVersionsnullableVersions": "4+", "tag": 0, "default": "null",
        "about": "The reason why the member left the group." }
    ]}
  ]
}

Admin API

The reason is added as an argument to removeMembersFromConsumerGroup method.

Code Block
languagejava
/**
 * Remove members from the consumer group by given member identities.
 * <p>
 * For possible error codes, refer to {@link LeaveGroupResponse}.
 *
 * @param groupId The ID of the group to remove member from.
 * @param reason The reason why the member left the group.
 * @param options The options to carry removing members' information.
 * @return The MembershipChangeResult.
 */
RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(String groupId, String reason, RemoveMembersFromConsumerGroupOptions options);

Consumer API

The reason is added as an argument to enforceRebalance method.

Code Block
languagejava
/**
 * @see KafkaConsumer#enforceRebalance()
 */
void enforceRebalance(String reason);

Proposed Changes

The KIP adds a new tagged and per-member field called Reason to the LeaveGroupRequest. The field will be populated with the leaving reason by the Consumer when it wants to leave the group. The consumer already logs the reason so we will reuse the same message. The field will be populated by the AdminClient as well when removeMembersFromConsumerGroup is called. In this case, the reason will be the consumer "member was removed by an admin" by default or the provided reason prefixed by "member was removed by an admin".

The KIP also adds a field called Reason to the JoinGroupRequest. The field will be populated with the reason why the joining reason by the Consumer. This will be really helpful to understand why a rebalance is triggered. The field will be populated by the Consumer as well when enforceRebalance is called. In this case, the reason will be "rebalance enforced by user" by default or the provided reason prefixed by "rebalance enforced by user:".

Compatibility, Deprecation, and Migration Plan

...