Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

__consumer_offsets is an internal topic storing the offsets of consumer groups. Its partition count is used to determine group coordinators via the following formula leader_of_partition(abs(hash(CONSUMER_GROUP_NAME)) % __CONSUMER_OFFSETS_PARTITION_COUNT). The partition count can be increased similarly to any other topic, but the new number is only picked up on broker startup. Once a Kafka cluster is running in production it is a good practice for brokers to be restarted in a rolling manner to assure availability during upgrades (regardless of whether the upgrades are to the infrastructure, underlying OS, packages or Kafka itself). However, during a rolling restart a subset of brokers are using the old partition count and a subset of brokers are using the new one. This leaves consumer groups in undefined and erroring states for the duration of the fleet bounce. This KIP proposes brokers to start using the new partition count of __consumer_offsets without requiring a restart whenever they detect a change to the topic to minimise the time during which consumer groups are in undefined states.

Graphs to support the motivation

The above graph demonstrates the noticeable increase in FindCoordinator requests for the duration of a rolling restart on a 2 broker cluster. During the experiment no consumers were added or removed from the consumer group but such a change in a real situation would not trigger a consumer group rebalance.

The above graph demonstrates a noticeable drop in successful offset commits to __consumer_offsets for the duration of the rolling restart. This means that consumption progress is not being recorded.

Public Interfaces

No public interfaces will be changed.

Proposed Changes

Reference implementation: https://github.com/apache/kafka/compare/trunk...clolov:kafka:consumer_offsets

The proposal is for individual brokers to refresh the partition count of __consumer_offsets without requiring a restart whenever they detect a change in the topic. Currently, such a change is propagated in the cluster through a LeaderAndISR request sent to all affected brokers by the controller. There already is a branch in the code path which invokes specific behaviour if the affected topic is __consumer_offsets. We alter that code path and invoke the refresh method so that the GroupCoordinator entity starts using the new partition count.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? Behaviour which previously manifested itself upon a restart will now manifest itself dynamically.
  • If we are changing behavior how will we phase out the older behavior? We are changing that an event happens sooner rather than later as such no phasing out is required.
  • If we need special migration tools, describe them here. N/A
  • When will we remove the existing behavior? In the next Kafka release (smile)

Test Plan

The test setup which can reproduce the problem is as follows:

Setup

  • We have 2 brokers using Zookeeper.
  • We have a topic (test_topic) with 10 partitions and replication factor of 2.
  • We have __consumer_offset with 55 partitions and replication factor of 2.
  • We have a producer which constantly produces to test_topic.
  • We have a consumer group (consumer_group) with 2 consumers which constantly consume.
  • Group coordinator is leader_of_partition(abs(hash(consumer_group)) % 55) = leader_of_partition(48) = broker 2.
  • We increase the number of partitions of the __consumer_offsets to 65.
  • The new group coordinator should be leader_of_partition(abs(hash(test_consumer_group)) % 65) = leader_of_partition(43) = broker 1

Current (3.3.1) behaviour

  • Stop broker 2.
  • Partition leadership will migrate to broker 1.
  • Broker 1 will be the new group coordinator.
  • Start broker 2.
  • The leadership of some partitions should move back to broker 2.
  • Broker 1 should stop being the coordinator of the group and the coordinator should move to broker 2.
  • Broker 2 should calculate that the coordinator should be broker 1 and it should direct consumers to it.
  • Broker 1 should direct them to 2 and this should continue repeating until we restart broker 1 as well.

I believe the above scenario (or its negative) can be added as an integration test once an approach which mitigates it has been implemented.

Rejected Alternatives

  1. Prevent clients from modifying the __consumer_offsets using the kafka-topics tool (and Kafka API). Even though Kafka documentation advises that __consumer_offsets should not be changed once created the tools do not enforce this. Furthermore, there are valid situations in which a customer would want to increase the partitions of __consumer_offsets, for example, to handle more traffic.
  2. We already have configurations to control aspects of the __consumer_offsets topic and it would be neat to expose partition modification through a configuration to ensure any change to the topic has been intentional (and hopefully its consequences considered). However, an addition of such a configuration would still require a mechanism to apply it dynamically which is what this KIP proposes.
  3. Ideally Kafka should be able to detect a change in the __consumer_offsets partition count, figure out which new partitions consumers would expect to find their data in and move it there seamlessly. While initially this sounds like an expensive operation it is important to note that upon any broker restart that broker reads through __consumer_offsets partitions it is responsible for to restore the group state anyway. While this is the direction I would like to move into it is something to be considered more thoroughly with respect to KIP-848: The Next Generation of the Consumer Rebalance Protocol.
  • No labels