Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

__consumer_offsets is an internal topic storing the offsets of consumer groups. Its partition count is used to determine group coordinators via the following formula leader_of_partition(abs(hash(CONSUMER_GROUP_NAME)) % __CONSUMER_OFFSETS_PARTITION_COUNT). The partition count can be expanded in a similar way as for increased similarly to any other topic, but the logic to start using the new number of partitions for group coordination only gets triggered upon is only picked up on broker startup. Once a Kafka cluster is up and running in production it is common a good practice for brokers to be restarted in a rolling manner to assure availability during upgrades (regardless of whether the upgrades are to the infrastructure, underlying OS, packages or Kafka itself). However, in such situations during a rolling restart a subset of brokers are using the old partition count and a subset of brokers are using the new one. This leaves consumer groups in undefined and erroring states for the duration of the rolling restartfleet bounce. This KIP proposes for brokers to refresh start using the new partition count of __consumer_offsets whenever it is changed without requiring a restart whenever they detect a change to the topic to minimise the time during which consumer groups are in undefined states.

Graphs to support the motivation

The above graph demonstrates a the noticeable increase in FindCoordinator requests for the duration of the a rolling restart on a 2 broker cluster. During the experiment no consumers were added or removed from the consumer group but such a change in a real situation would not trigger a consumer group rebalance.

The above graph demonstrates a noticeable drop in successful OffsetCommit offset commits to __consumer_offsets for the duration of the rolling restart. This means that consumption progress is not being recorded.

Public Interfaces

No public interfaces will be changed.

...

Reference implementation: https://github.com/apache/kafka/compare/trunk...clolov:kafka:consumer_offsets

The proposal in this KIP is for individual brokers to refresh the partition count of the __consumer_offsets topic without requiring a restart whenever they detect a change in the topic without requiring a restart. Currently, such a change is propagated in the cluster by through a LeaderAndISR request sent to all affected brokers by the controller. There already is already a branch in the code path which invokes specific behaviour if the changed affected topic is __consumer_offsets. We can hook into alter that code path and invoke the refresh method so that the GroupCoordinator entity starts using the new partition count.

...

  • What impact (if any) will there be on existing users? Behaviour which previously manifested itself upon a restart will now manifest itself dynamically.
  • If we are changing behavior how will we phase out the older behavior? We are changing that an event happens sooner rather than later as such no phasing out is required.
  • If we need special migration tools, describe them here. N/A
  • When will we remove the existing behavior? In the next Kafka release (smile)

...

  • We have 2 brokers using Zookeeper.
  • We have a topic (test_topic) with 10 partitions and replication factor of 2.
  • We have __consumer_offset with 55 partitions and replication factor of 2.
  • We have a producer which constantly produces to test_topic.
  • We have a consumer group (consumer_group) with 2 consumers which constantly consume.
  • Group coordinator is leader_of_partition(abs(hash(consumer_group)) % 55) = leader_of_partition(48) = broker 2.
  • We increase the number of partitions of the __consumer_offsets to 65.
  • The new group coordinator should be leader_of_partition(abs(hash(test_consumer_group)) % 65) = leader_of_partition(43) = broker 1

...

  • Stop broker 2.
  • Partition leadership will migrate to broker 1.
  • Broker 1 will be the new group coordinator.
  • Start broker 2.
  • The leadership of some partitions should move back to broker 2.
  • Broker 1 should stop being the coordinator of the group and the coordinator should move to broker 2.
  • Broker 2 should calculate that the coordinator should be broker 1 and it should direct consumers to it.
  • Broker 1 should direct them to 2 and this should continue repeating until we restart broker 1 as well.

I believe the above scenario (or its negative) can be added as an integration test once an approach which mitigates it has been implemented.

Rejected Alternatives

  1. Prevent clients from modifying the __consumer_offsets using the kafka-topics tool (and Kafka API). Even though the Kafka documentation advises that __consumer_offsets should not be changed once created the tools do not enforce this. Furthermore, there are valid situations where in which a customer wants would want to increase the partitions of __consumer_offsets - , for example, to handle more traffic.
  2. We already have configurations which determine some settings to control aspects of the __consumer_offsets topic and it would be neat to expose partition modification through a configuration to ensure any change to the topic has been intentional (and hopefully its consequences considered). However, any an addition of such a configuration would still requires require a mechanism to be applied apply it dynamically which is what this KIP aims to provideproposes.
  3. Ideally Kafka should be able to detect a change in the __consumer_offsets partition count, figure out in which new partitions consumers would expect to find their data in and move it there seamlessly. While initially this sounds like an expensive thing operation it is important to note that upon any broker restart that broker reads through __consumer_offsets partitions it is responsible for to restore the group state anyway. While this is the direction I would like to move into it is something to be considered more thoroughly with respect to KIP-848: The Next Generation of the Consumer Rebalance Protocol.