Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Implementing the new rebalance protocol in the current group coordinator is not appropriate in our opinion because its requires many changes anyway in the current protocol to make it interoperable. Therefore, this KIP proposes to rewrite the group coordinator from scratch in Java. The new group coordinator will have a state machine per __consumer_offsets partitions, where each state machine is modelled as an event loop. Those state machines will be executed in N group.coordinator.threads threads.

Consumer Groups

The group coordinator already supports the so called consumer groups. Those groups are groups which implement the consumer embedded protocol type. With the introduction of the new consumer rebalance protocol, we need a way to differentiate the existing groups from the new consumer groups. This is important because the existing group relies on a specific set of APIs whereas the new consumer group will use a different set of APIs.

...

Compatibility, Deprecation, and Migration Plan

The change is mainly backward compatible. The current protocol is still supported without any changes. The consumers and streams should be able to upgrade to the new protocol without any issues. The only part that may require adaptations is the regex based subscriptions. At the moment, those are validated on the client side based and that is language specific. We plan to use Google RE2/J on the server side so clients may have to adapt their regex based subscriptions when they migrate to the new protocol. We believe that the transition should be frictionless for most of the users.

The migration requires to first upgrades the servers to the new software and enable the new group coordinator. A roll is required for this. Then, the consumers must be upgraded to the new software as well and the new protocol must be enabled. This can be done with on roll if the consumer version is supported by the upgrade path. If not, two rolls are required.

We plan to release the feature in a Kafka 4.x release, possibly 4.0. The feature will be opt-in to start. We plan to make it the default in Kafka 5.0.

Kafka Broker Migration

Upgrading the cluster to the new rebalance protocol is pretty strait-forward. First, the cluster must be first upgraded to the new group coordinator. This can be done by rolling-upgrading the broker to a software version which supports it. The migration is seamless. Note that group.coordinator.threads may require some tuning depending on your workload. Second, the new protocol must be enabled by setting an IBP/MetadataVersion which supports it with the kafka-features command line tool.

Downgrading the cluster will be possible to a certain extend. It is possible to downgrade the IBP/MetadataVersion to disable the new rebalance protocol. In this case, all the consumer groups using the new protocol will be lost. It will be possible to downgrade to an earlier version of Kafka that does not support the new group coordinator. In this case, we will only support specific versions. The issue is that the current group coordinator errors out when the __consumer_offsets topics contains unknown records.

Kafka Consumer Migration

Upgrading consumers can be done online. First, all the consumers must be upgraded to a software version which supports the online migration. The version is not defined yet but the consumer will need to support the consumer embedded protocol version 3 (introduced in KIP-792). There are also other considerations if a custom client side assignor is used. Second, the consumer must be rolled to enable the new rebalance protocol. This is done by setting group.protocol to consumer. group.remote.assignor may need to be adjusted as well. When a consumer joins with the new protocol, the group is automatically converted from a generic group to a consumer group if the upgrade requirements are met (e.g. consumer embedded protocol version >=3). If they are not, the consumer is rejected with an INVALID_REQUEST error. During the upgrade process, consumers will continue to use the old rebalance APIs. The group coordinator will translate the JoinGroup, SyncGroup and Heartbeat API to the new rebalance protocol. This translation is an implementation detail but it is explained earlier in this document. If a customer client side assignor or regex based subscriptions are used, please pay attention to the details provided in the following sub-chapters.

Downgrading the consumers is the exact opposite process. The consumer must be rolled to disable the new rebalance protocol. This is done by setting the the group.protocol to generic. When the last consumer using the new rebalance protocol leaves the group, the group is automatically converted down from a consumer group to a generic group.

Regex Based Subscriptions

The new rebalance protocol relies on the group coordinator to track the metadata changes so the regular expressions is not used locally anymore but remotely. The group coordinator uses the Google RE2/J engine so the regular expression used with either of the methods must be compatible. The java.util.Pattern used to subscribe is toString'ed and passed to the group coordinator. If it is not compatible, the group coordinator will reject the ConsumerGroupHeartbeat request with an INVALID_REQUEST error. For simple regular expressions, we don't expect any changes to be required. It is recommended to test out the regex with the consumer-groups --verify-regex command line tool or with another group before migrating consumers.

Client Side Assignor

ConsumerPartitionAssignor cannot be used with the new rebalance protocol. Instead, the client side assignor must implement PartitionAssignor and configure group.local.assignors. The Javadoc of PartitionAssignor in the Public Interfaces section of this document explains how it works. During the migration, only the PartitionAssignor based assignor is used to compute the assignment for the group. That means that the new assignor must be able to deserialize the subscription metadata used by the old assignor and the serialize assignment metadata for the old members. The group coordinator cannot translate the metadata that it receives. However, it will signal them by using -1 as the version for it. This means that the new assignor must support version from -1 to X during the online migration. If a consumer using the new protocol joins a group with non-empty metadata, the group coordinator will ensure that the joining member's assignor supports -1 in its version range. It it does not, the group coordinator will reject it with an INVALID_REQUEST error.

Kafka Streams Migration

Upgrading streams instances can be done online. First, all the instances must be upgraded to a software version which supports the online migration. This version will be driven by the underlying consumer requirements. Second, the instances must be rolled to enable the new rebalance protocol. This is done by setting group.protocol to consumer.

Downgrading the instances is the exact opposite process. The instances must be rolled to disable the new rebalance protocol. This is done by setting the the group.protocol to genericWe don't plan to deprecate the current rebalance protocol anytime soon because Connect and others still rely on it. However, we will deprecate the consumer embedded protocol used in the current protocol in Kafka 5.0 and to remove it in Kafka 6.0. Consumers and Streams won't be able to use it any longer after this.

Test Plan

Our primary method for testing the implementation will be through Discrete Event Simulation (DES). DES allows us to test a large number of deterministically generated random scenarios which include various kinds of faults (such as network partitions). It allows us to define system invariants programmatically which are then checked after each step in the simulation. The protocol will be formally verified with a TLA+ model as well. Other than that, we will use the typical suite of unit/integration/system tests. System tests will be parameterised to run with both protocols.

...

We started this design by using an epoch per partition instead of relying on an epoch per member. Moving a partition from A to B would have require the following step: 1) revoke the partition from A; 2) bump the partition epoch; and 3) assign the partition to B. While this was very appealing at first, it was unpractical in the end for two reasons: 1) migrating from the current protocol is much more difficult without a member id; and 2) the metadata associated to the assignment (e.g. Streams metadata) is not tight to a particular partitions. We ended up using a member epoch with an incremental reassignment algorithm which is pretty close to this.

An epoch per member not aligned to the group epoch

The current design ensures that each member epoch eventually converged to the group epoch. In a previous iteration of the design, we considered incrementing member epoch only when the member's assignment changed. The benefit of this is that it reduce the number of writes to the __consumer_offsets partitions. The downside is that it is harder to debug/understand for users and operators because they cannot rely on the member epoch to know if the member has converged to the desired assignment. In the end, we decided to favour the debuggability. 

Not reusing the current coordinator

...