Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Note this is a joint worked proposed by David Jacot, guozhang Wang and Jason Gustafson.

Motivation

It has been about 6 8 years since we introduced the so-called new consumer which does group membership and rebalancing through Kafka. Although it was a huge improvement over the old zk-based consumer, it has still been a major pain point from an operation perspective. There are multiple reasons for this, let’s dive into them:

  • The protocol relies on thick clients. Thick clients are annoying in many ways:
    • We have had many bugs in the rebalance protocol over the last years and the majority of them required client side bug fixes. In the cloud area, this is really annoying because we effectively depend on the adoption of the clients in order to fix the issues in production. Unfortunately, the adoption is usually rather slow in the Kafka community;
    • It is almost impossible to debug issues in the protocol without having access to the client logs. In the cloud area, it is a bit annoying to have to request client logs to troubleshoot your system;
    • The clients specify the so-called embedded protocol in the rebalance protocol. While this allows the core protocol to be reused for different purposes, for instance it is used by both the consumer and connect in Apache Kafka, this makes inspecting the state on the broker side hard because the brokers get a bunch of raw bytes. The compatibility of the embedded protocols has also been a challenge; and
    • The clients are responsible for monitoring the metadata and for triggering rebalances. This has caused all sorts of issues in the past because clients of a given group might have a different view of the metadata at a given point in time.
  • The group protocol has been used for general state propagation between members. This is especially the case for power users such as Kafka Streams. While the state propagation is not an issue in itself, the protocol only propagates the state during a rebalance so we have introduced fake or dummy rebalance with the sole goal to propagate some new state to the leader of the group or to all the members of the group. This has been very confusing for our users both on the client side and the broker side. This also makes the interpretation of rebalances through metrics or logs more difficult.
  • The protocol has gotten too complex over the years. We started with a rather simple protocol and we have extended it a few times over the years. For instance, we introduced KIP-429: Kafka Consumer Incremental Rebalance Protocol, KIP-345: Introduce static membership protocol to reduce consumer rebalances, and a few others. All the incremental changes that we have made have increased the complexity of the protocol.
  • The protocol relies on a group-wide synchronization barrier. This means that a single misbehaving consumer can take down or disturb the whole group because a rebalance of the whole group is required whenever a consumer joins, leaves or fails.
    This also limits its scalability as the cost of a rebalance increases with the number of members in the group. Even the cooperative rebalancing protocols depend on the barrier. Specifically, one of the deficiencies of the cooperative protocol is that offsets cannot be committed while the consumer is waiting on the rebalance to complete. So even though a consumer can keep fetching while the rebalance is in progress, it still tends to get stuck behind the barrier.

Design Goals

We propose to introduce a new group membership and rebalance protocol for the Kafka Consumer and, by extensions, Kafka Streams. The proposed protocol is built on top of the following design goals.

  • The protocol should be truly incremental and cooperative and should not rely on a global synchronization barrier anymore. Ideally, a consumer should not be impacted at all by a rebalance if its assignment is not changed.
  • The complexity should move away from the consumer to the group coordinator. We want to be able to troubleshoot issues without requiring client logs and we want to fix issues without having to wait on consumer adoption.
  • The protocol should still allow power users such as Kafka Streams to run assignment logic on the client. This is important for Kafka Streams to remain independent from the broker. However, we want this process to be driven and controlled by the group coordinator.
  • The protocol should provide the same guarantee as the current protocol that is at-least-once in the worst case scenario and exactly-once when the hand off between members is clean.
  • Rolling the consumers with the correct configuration must be enough to upgrade to the new protocolThe protocol should support upgrading the consumers without downtime.

Note that Kafka Connect is not supported by this new protocol. We discuss how Kafka Connect could evolve by using a similar protocol in the future in the future work section.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

Rebalance Protocol in a Nutshell

The proposed rebalance protocol is based on the concept of a declarative assignment for the group and the use of reconciliation loops to drive members toward their desired assignment. Members can independently converge and the group coordinator takes care of resolving the dependencies - e.g. revoking a partition before it can be assigned - between the members if any.

The desired (or target) assignment is either directly computed by the group coordinator using a server side assignor or computed by one of the group members if a client side assignor is used. The former is the new default for consumers while the latter allows power users such as Kafka Streams to continue using purpose-built assignors. It is important to note that the entire rebalance process is driven by the group coordinator with this new protocol.

Unlike the current protocol which keeps the heartbeat mechanism as lightweight as possible, the new protocol piggybacks on it to let the group coordinator assign/revoke partitions to/from group members while allowing group members to propagate their current state to the group coordinator. The Heartbeat API is introduced for this purpose.

When a client side assignor is used, the group coordinator requests the assignment from one group member by notifying him via the heartbeat protocol. The chosen member uses the PrepareAssignment API and the InstallAssignment API to respectively get the current state of the group and to install the computed assignment. Thanks to this, the input of the client side assignor is entirely driven by the group coordinator. The consumer is no longer responsible for maintaining any state besides its assigned partitions.

The new protocol’s RPCs are specified in the details in the public interfaces section of this document while the details of the rebalance logic is described in the next chapter.

Group Coordinator

We propose to extend the current group coordinator to implement the new consumer group rebalance protocol. This chapter covers the various aspects related to the group coordinator.

Consumer Groups

The group coordinator already supports the so called consumer groups. Those groups are groups which implement the “consumer” embedded protocol type. With the introduction of the new consumer rebalance protocol, we need a way to differentiate the existing groups from the new consumer groups. This is important because the existing group relies on a specific set of APIs whereas the new consumer group will use a different set of APIs.

Therefore, we propose to introduce the notion of types within the group coordinator. This will allow us to support different types of groups in the future. We propose to call the current group “generic” as it represents a generic implementation of the membership protocol which is specialized by a protocol type and to call the new consumer group “consumer”.

Effectively, we would have consumer groups and generic groups which implement a consumer protocol, the old one and the new one proposed in this document.

The ListGroups API will be extended to support both filtering on the group types and returning the group types of the queried groups.

Data Model

Before diving into the details of the new rebalance process, let’s define the data model of the group as the group coordinator will bookkeep it. Note that this data model is a logical one. The detailed records are described in the Public Interfaces section of this document.

Consumer Group & Member

The group and the members represents the current state of a group.

Consumer Group
NameTypeDescription
Group IDuuidThe group ID as configured by the consumer. The ID uniquely identifies the group.
Group Epochint32The current epoch of the group. The epoch is incremented by the group coordinator when a new assignment is required for the group.
Members[]MemberThe set of members in the group.
Partitions Metadata[]PartitionMetadataThe metadata of the partitions that the group is subscribed to. This is used to trigger a rebalance when the partition metadata changes.
Member
NameTypeDescription
Member IDuuidThe unique identifier of the member. The ID is similar to an incarnation ID. It is generated by the client once and must be used during its lifetime.
Instance IDstringThe instance ID configured by the consumer.
Client IDstringThe client ID configured by the consumer.
Client HoststringThe client ID configured by the consumer.
Subscribed Topic Names[]stringThe current set of subscribed topic names configured by the consumer.
Subscribed Topic RegexstringThe current subscription regular expression configured by the consumer.
Server AssignorstringThe server side assignor used by the group.
Client Assignors[]AssignorThe list of client-side assignors supported by the member. The order of this list defined the priority.
Assignor
NameTypeDescription
NamestringThe unique name of the assignor.
Minimum Versionint32The minimum version of the metadata schema supported by this assignor.
Maximum Versionint32The maximum version of the metadata schema supported by this assignor.
Reasonint8The reason why the metadata was updated.
MetadatabytesThe metadata provided by the consumer for this assignor.

Target Assignment

The target (or desired) assignment of the group. This represents the assignment that all the members will eventually converge to. It is a declarative assignment which is generated by the assignor based on the group state.

Target Assignment
NameTypeDescription
Group IDuuidThe group ID as configured by the consumer. The ID uniquely identifies the group.
Assignment Epochint32The epoch of the assignment. It represents the epoch of the group used to generate the assignment. It will eventually match the group epoch.
Assignment Error int8The error reported by the assignor.
Members[]MemberThe assignment for each member.
Member
NameTypeDescription
Member IDuuidThe unique identifier of the member.
PartitionsPartitionsThe set of partitions assigned to this member.
MetadatabytesThe metadata assigned to this member.

Current Assignment

The Current Assignment represents the current epoch and assignment of a member. Note that members of a given group could be at a different epoch but they will all eventually converge to the target assignment.

Current Assignment
NameTypeDescription
Group IDuuidThe group ID as configured by the consumer. The ID uniquely identifies the group.
Member IDuuidThe member ID of this member.
Epochint32

The current epoch of this member. The epoch is the assignment epoch of the assignment currently used by this member. This epoch is the one used to fence the member (e.g. offsets commit).

Error int8The error reported by the assignor.
PartitionsPartitionsThe current partitions used by the member.
MetadatabytesThe current metadata used by the member.

Rebalance Process

The rebalance process is entirely driven by the group coordinator and revolves around three kinds of epoch: the group epoch, the assignment epoch and the member epoch. The process and the epochs are explained in the following chapters.

Group Epoch - Trigger a rebalance

The group coordinator is responsible for triggering a rebalance of the group when the metadata of the group changes. The metadata of the group is used as the input of the assignment function. For tracking this, we introduce the group epoch which represents the generation (or the version) of the group metadata. The group epoch is incremented whenever the group metadata is updated. There are a couple of cases to consider:

  • A member joins or leaves the group.
  • A member updates its subscriptions.
  • A member updates its assignors.
  • A member updates its assignors' metadata.
  • A member is fenced or removed from the group by the group coordinator.
  • The partition metadata is updated. For instance when a new partition is added or a new topic matching the subscribed topics is created.

In all these cases, a new version of the group metadata is persisted by the group coordinator with an incremented group epoch. This also signals that a new assignment is required for the group.

Assignment Epoch - Compute the group assignment

Whenever the group epoch is larger than the target assignment epoch, the group coordinator will trigger the computation of a new target assignment based on the latest group metadata. When the new assignment is computed, the group coordinator persists it. The assignment epoch becomes the group epoch of the group metadata used to compute the assignment.

The group coordinator either directly computes the new target assignment for the group based on its default server-side assignor or requests a new assignment from one of the members in the group. The entire delegation logic for the latter is detailed later in the document.

Member Epoch - Reconciliation of the group

Once a new target assignment is installed, each member will independently reconcile their current assignment with their new target assignment. Ultimately, each member will converge to their target epoch and assignment. The reconciliation process requires two phases:

  • Revocation Phase: The first phase revokes the partitions which are no longer in the target assignment of the member. The group coordinator does so by providing the Current Partitions - Target Partitions in the heartbeat response until the member acknowledges the revocation in the heartbeat response. When the group coordinator receives the acknowledgement, it updates the member current assignment to its target assignment (and target epoch) and durably persist it. The group coordinator will give the rebalance timeout to the member for the revocation process to complete or kick it out from the group otherwise.
  • Assignment Phase: The second phase assigns the new partitions to the member. The group coordinator does so by providing the Target Partitions to the member while ensuring that partitions which are not revoked by other members yet are removed from this set. In other words, new partitions are incrementally assigned to the member when they are revoked by the other members.

The rebalance timeout is provided by the member when it joins the group. It is basically the max poll interval configured on the client side. The timer starts ticking when the heartbeat response is sent out by the group coordinator.

Assignment Process

Whenever the group epoch is larger than the assignment epoch, the group coordinator must compute a new target assignment for the group. The group coordinator will either directly compute a new assignment with its server side assignor or delegate the assignment to a member of the group if a client-side assignor must be used.

The new target assignment for the group is basically a function of the current group metadata and the current target assignment. One important aspect to note here is that the assignment is declarative now instead of being incremental like it is in the current implementation. In other words, the assignor defined the desired state for the group and let the group coordinator converge to it.

Assignor Selection

The group coordinator has to determine which assignment strategy must be used for the group. The group's members may not have exactly the same assignors at any given point in time. They may migrate from an assignor to another one for instance. The group coordinator will chose the assignor to run as follow:

  • The server side assignor is used if any member specified one. If multiple server side assignors are specified in the group, the group coordinator uses the most common one.
  • The client side assignor is used otherwise. The group coordinator will use the assignor which is supported by all the members in the group and, if multiple are, it will respect the precedence defined by the members when they advertise their supposed assignors.

Server Side Mode

The server side assignor is pluggable and the client can choose the one that it wants to use by providing its name in the heartbeat request. If the selected assignor does exist, the group coordinator will reject the heartbeat with an UNSUPPORTED_ASSIGNOR error. The list of supported assignors will be configured in the broker config.

We will support two assignors out of the box for Apache Kafka:

  • range - An assignor which co-partitions topics.
  • uniform - An assignor which uniformly assign partitions amongst the members. This is somewhat similar to the existing "sticky" assignor.

Note that in both cases, assignors are sticky. The goal is to minimise partition movements.

Client Side Mode

The client side assignment is executed by the consumer. The overall process has the following phases:

  • The group coordinator selects a member to run the assignment logic. The selection is explained later in this chapter.
  • The group coordinator notifies the member to compute the new assignment by returning the COMPUTE_ASSIGNMENT error in its next heartbeat response.
  • When the member receives this error, it is expected to call the PrepareAssignment API to get the current group metadata and the current target assignment.
  • The member computes the new assignment with the relevant assignor.
  • The member calls the InstallAssignment API to install the new assignment. The group coordinator validates it and persists it.

Note that the group coordinator always installs any new valid assignment, even if the group epoch has changed in the mean time, to ensure that the group can always make progress. We want to avoid the situation where a faulty member could prevent the whole group to move forward. The group coordinator only allows one inflight assignment at the time.

The chosen member is expected to complete the assignment process within the rebalance timeout. The time on the coordinator side starts ticking when the member is notified.

Metadata Version Handling (KIP-268)

Managing the compatibility of the metadata used by the client side assignors has been a challenge for our powerful users such as Kafka Streams. The metadata is usually versioned but there is not guarantee in the current protocol which ensures that the elected leader is able to handle all the versions used in the group. Kafka Streams introduces the so-called version probing (KIP-268) to mitigate this issue. This mechanism basically allows the leader to downgrade the version used by the other members in the group.

We propose to make the version a first class citizen concept in the new protocol. Every member will advertise the version used to encode their metadata, usually the highest that they support, and the minimum and the maximum version that they can handle. This allow the group coordinator to reason about the versions and to pick the member to run the assignment wisely.

The group coordinator will also ensure that any member joining with a non-overlapping version range is rejected with the UNSUPPORTED_ASSIGNOR error.

Member Selection

The group coordinator can generally pick any members to run the assignment. However, when the members support different version ranges, the group coordinator must select a member which is able to handle all the supported versions. For instance, if we have three members: A [1-5], B [3-4], C [2-4]. Member A must be selected because it supports all the other versions in the group.

Assignment Validation

Before installing any new assignment, the group coordinator will ensure that the following invariants are met:

  • All partitions are assigned.
  • A partition is assigned only once.
  • All members exists.

Note that this validation is made with regarding to the metadata used to compute the assignment. The group may have already advanced to a newer group epoch - e.g. a member could have left during the assignment computation.

The installation will be rejected with an INVALID_ASSIGNMENT error if the invariants are not held.

Assignment Error

There could be cases where the the client side assignor can not compute a new assignment. For instance, in the context of Kafka Streams, the members may have a different topology. In this case, the client side assignor can return an error to the group coordinator. In this case, the group coordinator automatically keeps the current target assignment for group.

Member ID

Every member is uniquely identified by a UUID. This is is called the Member ID. This UUID is generated on the client side and used in all the communication with the group coordinator. The ID must be kept during the entirely life span of the member (e.g. the consumer). In that sense, it is similar to an incarnation ID.

Heartbeat & Session

The member uses the ConsumerGroupHeartbeat API to establish a session between him and the group coordinator. The member is expected to heartbeat every group.consumer.heartbeat.interval.ms in order to keep its session opened. If it does not heartbeat at least once within the group.consumer.session.timeout.ms, the group coordinator will kick him out from the group. group.consumer.heartbeat.interval.ms is defined on the server side and the member is told about it in the heartbeat response. The group.consumer.session.timeout.ms is also defined on the server side.

Joining & Leaving

The member joins the group by sending an heartbeat with its Member ID and a member epoch equals to 0. He can leaves the group by using a member epoch equals to -1.

Fencing

The group coordinator ensures that requests comes from a known Member ID. Any request is rejected with the UNKNOWN_MEMBER_ID error otherwise. It also ensures that the Member Epoch matches the expected member epoch. If not, the request is rejected with the FENCED_MEMBER_EPOCH error. Details for every API are given in the Public Interfaces section.

Static Membership (KIP-345)

Static membership, introduced in KIP-345, is still supported by this new rebalance protocol. When a member rejoins with the same Instance ID, the group coordinator replaces the old member with the new member. The new member can continue from where it left off.

Consumer Group States

EMPTY

When a consumer group is created or when the last member leaves the group, the consumer group is EMPTY.

ASSIGNING

When the group epoch is larger than the assignment epoch, the consumer group is ASSIGNING.

Consumer groups relying on the server-side assignor (e.g. regular consumers) are not expected to be in this state because the assignment is computed directly by the Group Coordinator.

RECONCILING

Until all the members have converged to the group epoch, the consumer group is RECONCILING.

STABLE

Once the reconciliation process is completed, the consumer group moves to the STABLE state.

DEAD

Like today, when the group remains EMPTY for a configured period, the group coordinator transitions it to DEAD to delete it.

Dynamic Group Configuration

The new rebalance protocol relies on server side configurations such as group.consumer.heartbeat.interval.ms and group.consumer.session.timeout.ms. Our goal is to give administrator the ability to use and tweak those settings for their entire consumers fleet. However, it may not always be possible to have values fitting all workloads. Therefore, we propose to extend the IncrementalAlterConfigs and the DescribeConfigs API to support a new resource type called GROUP. This allows users to override the default defined by the administrators. The dynamic group configurations are described in the Public Interfaces section.

The group coordinator is responsible for storing those group configurations in order to keep their lifecycle tight to their group. When a group is deleted, we want the configuration to be deleted as well. This assumes that IncrementalAlterConfigs and the DescribeConfigs API will be routed to the group coordinator owning the group they are acting upon.

Feature Flag

We introduce a new feature flag named “group.version” instead of relying on the IBP to enable the new consumer group protocol. The idea is to follow a similar path than KIP-778. V1 will be the group coordinator as we know it today and V2 will support the new consumer group protocol.

Using a feature flag will enable operators to enable the new consumer group protocol without having to roll the cluster twice. In other words, once the new software is deployed on all the nodes, the operator can update the feature flag to enable the feature. Until the feature is enabled, the new APIs won’t be advertised by the ApiVersions API.

Persistence

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

Consumer

The semantics of the consumer will remain unchanged after this proposal is implemented. The goal is to swap the implementation of the group membership/assignment protocol by the new one.

Feature Flag

A new configuration setting will be used to determine whether the new protocol should be used or not. The feature flag allows the user to control when he starts using or migrating to the new protocol for its application. This is also required by our migration path as we will require to have the software on a specific version which is compatible with the new protocol.

In the case where a consumer would try to use the new protocol against a cluster which does not support it, either because the software is too old or because the feature is not enabled, the consumer would fail starting with a fatal exception.

In the beginning, the new protocol will be disabled by default. We envision enabling it by default in a future major release of Kafka.

Rebalance Process

The rebalance process in the consumer is basically the opposite of the process that was described earlier in this document. The consumer will know at any point in time its current epoch and the list of partitions that it owns. There are a few cases to consider:

  • If the member is fenced by the group coordinator, it will immediately abandon all its partitions and call ConsumerRebalanceListener#onPartitionsLost. It will rejoin the group as a new member afterwards.
  • Otherwise, the member will compute the difference between its currently owned partitions and the assigned partitions, as defined in the heartbeat response.
    • If there are any revoked partitions, it will revoke them, commits their offsets and call ConsumerRebalanceListener#onPartitionsRevoked.
    • If there are any newly assigned partitions, it will start processing them and call ConsumerRebalanceListener#onPartitionsAssigned.

Note that the process is dependent on Consumer#poll being called like with the current protocol. There is a parallel effort to this design to redesign the threading model of the consumers. This will very likely change how/when those callbacks are called.

Client-Side Assignor

By default, the consumer will entirely rely on the group coordinator but it will allow specifying a customer assignor on the client-side as already explained in this document. For this purpose, we propose to introduce a new and optional assignor interface in the Consumer called PartitionAssignor. The interface is specified in the public interfaces section of this document. The current assignor interface is strongly tied to the current group membership/assignment protocol so reusing it is not appropriate for two reasons:

  • The new protocol does not really fit in the current interface and its semantic is different; and
  • It seems preferable to let us evolve the current protocol independently if the need arises.

Deprecate Enforcing Rebalances

Consumer#enforceRebalance will be deprecated and will throw an IllegalStateException if used when the new protocol is enabled. Enforcing a rebalance with the new protocol does not make any sense. Instead, power users will have the ability to trigger a reassignment by either providing a non-zero reason or by updating the assignor metadata.

Streams

Kafka Streams remains a power user of the consumer so it will continue extending the consumer by providing an implementation of the new assignor interface. Streams will also rely on a feature flag to enable the new rebalance protocol.

Assignor & Assignment Metadata

The Version, MinimumVersion, MaximumVersion, Reason and Error fields are not first class citizen in the rebalance protocol so Stream does not have the specify them in the metadata anymore. The schemas for respectively the assignor metadata and the assignment metadata are detailed in the Public Interfaces section.

Note that Streams may take this opportunity to do further changes to its metadata. We may extend this KIP or do a follow-up KIP in the future for this.

Assignor Behavior

The assignor behavior remains similar to the existing assignor. The major difference is that the assignor must serialize the assignment metadata of each member with the correct version used by the member. Another difference is that the new assignor must be able to handle the old metadata format as well during the upgrade from the old to the new protocol. This upgrade path is detailed in the upgrade section of this document.

Member Behavior

Upon receiving the assignment, each member would respectively create, close, or recycle tasks as indicated and update the global assignment information, like today. We explained earlier that partitions are incrementally assigned to the member when they are revoked by the others. This means that the assignment metadata may already reference partitions which are not assigned to the member yet. The Streams assignor must consider the assigned partitions as the source of truth in this case.

Each member encodes the lag of its standby tasks in its metadata. We can not update the lag in every heartbeat request because that would constantly trigger reassignment in the group. Instead, when a) the task lag has been reduced within the acceptable.recovery.lag threshold or b) the task lag is consistently increasing for some time, the member should consider triggering a rebalance by sending its next heartbeat with the appropriate encoded reason and the updated task lags. 

Public Interfaces

This section lists the changes impacting the public interfaces.

KRPC

New Errors

  • FENCED_MEMBER_EPOCH - The member epoch does not correspond to the member epoch expected by the coordinator. The member must abandon its partitions and rejoin the group.
  • COMPUTE_ASSIGNMENT - The member has been selected by the coordinator to compute the new target assignment of the group.
  • UNSUPPORTED_ASSIGNOR - The assignor used by the member or its version range are not supported by the group.

ConsumerGroupHeartbeat API

The ConsumerGroupHeartbeat API is the new core API used by consumers to form a group. The API allows members to advertise their subscriptions, their state, their assignors, and their owned partitions. The group coordinator uses it to assign/revoke partitions to/from members. This API is also used as a liveness check.

Request Schema

The member must set all the (top level) fields when it joins for the first time or when an error occurs (e.g. request timed out). Otherwise, it is expected to only fill in the fields which have changed since the last heartbeat.

Code Block
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupHeartbeatRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "uuid", "versions": "0+",
      "about": "The member id generated by the client." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+", "default": "-1",
      "about": "The current member epoch; 0 to join the group; -1 to leave the group." },
    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null it not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that
                the coordinator will wait on the member to revoke its partitions otherwise." },
    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
    { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." }, 
    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.",
      "fields": [
        { "name": "Name", "type": "string", "versions": "0+",
          "about": "The name of the assignor." },
        { "name": "MinimumVersion", "type": "int32", "versions": "0+",
          "about": "The minimum supported version for the metadata." },
        { "name": "MaximumVersion", "type": "int32", "versions": "0+",
          "about": "The maximum supported version for the metadata." },
        { "name": "Version", "type": "int32", "versions": "0+",
          "about": "The version of the metadata." },
        { "name": "Reason", "type": "byte", "versions": "0+",
          "about": "The reason of the metadata update." }, 
        { "name": "Metadata", "type": "bytes", "versions": "0+",
          "about": "The metadata." }
      ]},
    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "null if it didn't change since the last heartbeat; the topics owned by the member.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
          "about": "The topic ID." },
        { "name": "Partitions", "type": "[]int32", "versions": "0+",
          "about": "The partitions." }
      ]}
  ]
}

Required ACL

  • Read Group

Request Validation

INVALID_REQUEST is returned should the request not obey to the following invariants:

  • GroupId must be non-empty.
  • MemberId must be non-zero.
  • MemberEpoch must be >= -1.
  • InstanceId, if provided, must be non-empty.
  • RebalanceTimeoutMs must be larger than zero in the first heartbeat request.
  • SubscribedTopicNames and SubscribedTopicRegex cannot be used together.
  • SubscribedTopicNames or SubscribedTopicRegex must be in the first heartbeat request.
  • SubscribedTopicRegex must be a valid regular expression.
  • ServerAssignor and ClientAssignors cannot be used together.
  • Assignor.Name must be non-empty.
  • Assignor.MinimumVersion must be >= 0.
  • Assignor.MaximumVersion must be >= 0 and >= Assignor.MinimumVersion.
  • Assignor.Version must be in the >= Assignor.MinimumVersion and <= Assignor.MaximumVersion.

UNSUPPORTED_ASSIGNOR is returned should the request not obey to the following invariants:

  • ServerAssignor must be supported by the server.
  • ClientAssignors' version range must overlap with the other members in the group.

Request Handling

When the group coordinator handle a ConsumerGroupHeartbeat request:

  1. Lookups the group or creates it.
  2. Creates the member should the member epoch be zero or checks whether it exists. If it does not exist, UNKNOWN_MEMBER_ID is returned.
  3. Checks wether the member epoch matches the member epoch if its current assignment. FENCED_MEMBER_EPOCH is returned otherwise. The member is also removed from the group.
    • There is an edge case here. When the group coordinator transitions a member to its target epoch, the heartbeat response with the new member epoch may be lost. In this case, the member will retry with the member epoch that he knows about and his request will be rejected with a FENCED_MEMBER_EPOCH. This is not optimal. Instead, the group coordinator could accept the request if the partitions owned by the members are a subset of the target partitions. If it is the case, it is safe to transition the member to its target epoch again.
  4. Updates the members informations if any. The group epoch is incremented if there is any change.
  5. Reconcile the member assignments as explained earlier in this document. 

Response Schema 

The group coordinator will only set the Assignment field when the member epoch is smaller than the target assignment epoch. This is done to ensure that the members converge to the target assignment.

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupHeartbeatResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  // Supported errors:
  // - GROUP_AUTHORIZATION_FAILED
  // - NOT_COORDINATOR
  // - COORDINATOR_NOT_AVAILABLE
  // - COORDINATOR_LOAD_IN_PROGRESS
  // - INVALID_REQUEST
  // - GROUP_ID_NOT_FOUND
  // - UNKNOWN_MEMBER_ID
  // - FENCED_MEMBER_EPOCH
  // - UNSUPPORTED_ASSIGNOR
  // - COMPUTE_ASSIGNMENT
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", 
      "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." },
    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
      "about": "The heartbeat interval in milliseconds." }, 
    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
	  "about": "null if not provided; the assignment otherwise."
      "fields": [
	    { "name": "TopicPartitions", "type": "[]TopicPartition", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned topic-partitions to the member otherwise.",
          "fields": [
        	{ "name": "TopicId", "type": "uuid", "versions": "0+",
              "about": "The topic ID." },
	        { "name": "Partitions", "type": "[]int32", "versions": "0+",
              "about": "The partitions." }
      	]},
    	{ "name": "Error", "type": "byte", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned error." } 
	    { "name": "Metadata", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": "null",
          "about": "The assigned metadata." }
	]
  ]
}

Response Handling

If the response contains no error, the member will reconcile its current assignment towards its new assignment. It does the following:

  1. It updates its member epoch.
  2. It computes the difference between the old and the new assignment to determine the revoked partitions and the newly assignment partitions. There should be either revoked partitions or newly assignment partitions. The protocol never does both together.
    1. It revokes the partitions, commit all the offsets, and calls ConsumerRebalanceListener#onPartitionsRevoked.
    2. It assigns the new partitions, calls PartitionAssignor#onAssignment if one is defined and calls ConsumerRebalanceListener#onPartitionsAssigned.
  3. After a revocation, It sends the next heartbeat immediately to acknowledge it. 

Upon receiving the COMPUTE_ASSIGNMENT error, the consumer starts the assignment process.

Upon received the UNKNOWN_MEMBER_ID or FENCED_MEMBER_EPOCH error, the consumer abandon all its partitions and rejoins with the same member id and the epoch 0.

ConsumerGroupPrepareAssignment API

The ConsumerGroupPrepareAssignment API will be used by the consumer to get the information to feed its client-side assignor.

Request Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ConsumerGroupPrepareAssignmentRequest",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
      "about": "The group identifier." },
    { "name": "MemberId", "type": "uuid", "versions": "0+",
      "about": "The member id assigned by the group coordinator." },
    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
      "about": "The member epoch." }
  ]
}

Required ACL

  • Read Group

Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue
{
  "apiKey": TBD,
  "type": "response",
  "name": "ConsumerGroupPrepareAssignmentResponse",
  "validVersions": "0",
  "flexibleVersions": "0+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code, or 0 if there was no error" },
    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
      "about": "The top-level error message, or null if there was no error." },
    { "name": "GroupEpoch", "type": "int32", "versions": "0+",
      "about": "The group epoch." },
    { "name": "ClientAssignorName", "type": "string", "versions": "0+",
      "about": "The selected assignor." },
    { "name": "Members", "type": "[]Member", "versions": "0+",
      "about": "The members.", "fields": [
    { "name": "MemberId", "type": "uuid", "versions": "0+",
      "about": "The member ID." },
      { "name": "MemberEpoch", "type": "int32", "versions": "0+",
        "about": "The member epoch." },
      { "name": "InstanceId", "type": "string", "versions": "0+",
        "about": "The member instance ID." },
      { "name": "Subscriptions", "type": "[]uuid", "versions": "0+",
        "about": "The subscribed topic IDs." },
      { "name": "AssignorVersion", "type": "int32", "versions": "0+",
        "about": "The version of the metadata." },
       { "name": "AssignorReason", "type": "byte", "versions": "0+",
        "about": "The reason of the metadata update." }, 
      { "name": "AssignorMetadata", "type": "bytes", "versions": "0+",
        "about": "The assignor metadata." },
      { "name": "TopicPartitions", "type": "[]TopicPartition",
        "versions": "0+",
        "about": "The assigned topic-partitions to the member.",
        "fields": [
          { "name": "TopicId", "type": "uuid", "versions": "0+",
            "about": "The topic ID." },
          { "name": "Partitions", "type": "[]int32", "versions": "0+",
            "about": "The partitions." }
      ]}
    ]},
    { "name": "PartitionMetadata", "type": "[]Metadata", "versions": "0+",
      "about": "The topic-partition metadata.",
      "fields": [
        { "name": "TopicId", "type": "uuid", "versions": "0+",
        "about": "The topic ID." },
        { "name": "NumPartitions", "type": "int32", "versions": "0+",
        "about": "The number of partitions." }
    ]}    
  ]
}

Response Handling

Possible errors:

  • GROUP_AUTHORIZATION_FAILED - The group operation is not authorized.
  • NOT_COORDINATOR - The coordinator which has received the request is not the correct coordinator.
  • COORDINATOR_NOT_AVAILABLE - The coordinator is not available.
  • COORDINATOR_LOAD_IN_PROGRESS - The coordinator is loading its state.
  • INVALID_REQUEST - The request is invalid.
  • INVALID_GROUP_ID - The group ID is invalid.
  • GROUP_ID_NOT_FOUND - The group ID does not exist.
  • UNKNOWN_MEMBER_ID - The member ID is not known by the coordinator. The member must abandon the process.
  • FENCED_MEMBER_EPOCH - The member epoch does not correspond to the member epoch expected by the coordinator. The member must abandon the process.

ConsumerGroupInstallAssignment API

The ConsumerGroupInstallAssignment API will be used by the consumer to install a new assignment for the group. The new assignment is the result of the client-side assignor.

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling


ConsumerGroupDescribe API

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling


ListGroups API

The existing ListGroups API will be extended to support the notion of group types and to support the new group states.

Request

The TypesFilter field is introduced. It allows listing groups of certain types.

Code Block
languagejs
linenumberstrue
{
  "apiKey": 16,
  "type": "request",
  "listeners": ["zkBroker", "broker"],
  "name": "ListGroupsRequest",
  // Version 1 and 2 are the same as version 0.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the StatesFilter field (KIP-518).
  //
  // Version 5 adds the TypesFilter field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "StatesFilter", "type": "[]string", "versions": "4+",
      "about": "The states of the groups we want to list. If empty
                all groups are returned with their state."
    },
    { "name": "TypesFilter", "type": "[]string", "versions": "5+",
      "about": "The types of the groups we want to list. If empty
                all groups are returned"
    }
  ]
}


Required ACL:

  • Read Group

Response

The GroupType field is introduced. It represents the type of the group.

Code Block
linenumberstrue
{
  "apiKey": 16,
  "type": "response",
  "name": "ListGroupsResponse",
  // Version 1 adds the throttle time.
  //
  // Starting in version 2, on quota violation, brokers send out
  // responses before throttling.
  //
  // Version 3 is the first flexible version.
  //
  // Version 4 adds the GroupState field (KIP-518).
  //
  // Version 5 adds the GroupType field (KIP-848).
  "validVersions": "0-5",
  "flexibleVersions": "3+",
  "fields": [
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
      "ignorable": true,
      "about": "The duration in milliseconds for which the request
                was throttled due to a quota violation, or zero if
                the request did not violate any quota." },
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The error code, or 0 if there was no error." },
    { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
      "about": "Each group in the response.", "fields": [
      { "name": "GroupId", "type": "string", "versions": "0+",
        "entityType": "groupId",
        "about": "The group ID." },
      { "name": "ProtocolType", "type": "string", "versions": "0+",
        "about": "The group protocol type." },
      { "name": "GroupState", "type": "string", "versions": "4+",
        "ignorable": true,
        "about": "The group state name." },
      { "name": "GroupType", "type": "string", "versions": "5+",
        "ignorable": true,
        "about": "The group state name." }
    ]}
  ]
}


JoinGroup API

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling

OffsetCommit API

Update MemberId to UUID, add MemberEpoch

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling

OffsetFetch API

Add MemberId/MemberEpoch

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling

DescribeConfigs API

Add group resource.

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling

AlterIncrementalConfigs API

Add group resource.

Request Schema

Code Block
languagejs
linenumberstrue

Required ACL


Request Validation


Request Handling


Response Schema

Code Block
languagejs
linenumberstrue

Response Handling


TODO

  • config api
  • offsets api (member id / member epoch)
  • all the others
  • streams metadata

Compatibility, Deprecation, and Migration Plan

TODO

Case Studies

Let’s look at a few examples to illustrate the rebalance logic. Let’s assume that the group is subscribed to the topic foo which has 3 partitions.

Let’s start with an empty group:

  • Group (epoch=0)
    • No members
  • Target Assignment (epoch=0)
    • No members
  • Member Assignment
    • No members


Member A joins the group. The coordinator bumps the group epoch to (1)..

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=0)
    • No members
  • Member Assignment
    • A - epoch=0, partitions=[]


The coordinator computes and installs the new target assignment..

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=0, partitions=[]


When A sends its next heartbeat, it has no assigned partitions at this stage so the coordinator can directly reply with the new epoch (1) and the new assigned partitions. The state of the group does not change.

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=0, partitions=[]


When A sends its next heartbeat with its new epoch (1) and its new assigned partitions, the group coordinator moves the member assignment to (1).

  • Group (epoch=1)
    • A
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2]


Member B joins the group. The coordinator adds the member to the group and bumps the group epoch to (2).

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=1)
    • A - partitions=[foo-0, foo-1, foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=0, partitions=[]


The coordinator computes and installs the new target assignment.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=1, partitions=[foo-0, foo-1, foo-2]
    • B - epoch=0, partitions=[]


At this point B is blocked at epoch (0) until A moves to epoch (2) because A must revoke foo-2 first before B can start consuming it. The group coordinator will instruct A to revoke foo-2 via its next heartbeat. When A acknowledges that the revocation is completed, the coordinator can move A to epoch (2).

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=0, partitions=[]


Now B can get its new assignment via its next heartbeat. The state does not change until B acknowledges the new epoch and assignment.

  • Group (epoch=2)
    • A
    • B
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2]


Member C joins the group. The coordinator adds the member to the group and bumps the group epoch.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=2)
    • A - partitions=[foo-0, foo-1]
    • B - partitions=[foo-2]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2]
    • C - epoch=0, partitions=[]


The coordinator computes and installs the new target assignment.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=2, partitions=[foo-2]
    • C - epoch=0, partitions=[]


Like for the previous member addition, foo-1 must be revoked first before A can advance to epoch (3) and C is blocked until foo-1 is revoked.

B sends its next heartbeat, the group coordinator replies with epoch (3) and partitions (foo-2). The state of the group does not change until B sends another heartbeat to acknowledge.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=2, partitions=[foo-0, foo-1]
    • B - epoch=3, partitions=[foo-2]
    • C - epoch=0, partitions=[]


C sends its next heartbeat, the group coordinator continues to reply with epoch (0) and no partitions.

A sends its next heartbeat, the group coordinator replies with epoch (2) and partitions (foo-0) to revoke partition foo-1. The state of the group does not change until A sends another heartbeat to acknowledge.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=3, partitions=[foo-0]
    • B - epoch=3, partitions=[foo-2]
    • C - epoch=0, partitions=[]


At this point, foo-1 is free so C can advance to epoch (3). C sends it next heartbeat, the group coordinator replies with epoch (3) and partitions (foo-1). The state of the group does not change until C sends another heartbeat to acknowledge.

  • Group (epoch=3)
    • A
    • B
    • C
  • Target Assignment (epoch=3)
    • A - partitions=[foo-0]
    • B - partitions=[foo-2]
    • C - partitions=[foo-1]
  • Member Assignment
    • A - epoch=3, partitions=[foo-0]
    • B - epoch=3, partitions=[foo-2]
    • C - epoch=3, partitions=[foo-1]


All the members have eventually advanced to the group epoch (3).

Test Plan

  • Unit/Integration/System/Simulation Tests

Rejected Alternatives

TODO

Future Work

TODOIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.