You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current state: "Under Discussion"

Discussion thread: here

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

__consumer_offsets is an internal topic storing the offsets of consumer groups. Its partition count is used to determine group coordinators. The partition count can be expanded in a similar way as for any other topic, but the logic to start using the new number of partitions for group coordination only gets triggered upon broker startup. Once a Kafka cluster is up and running in production it is common for brokers to be restarted in a rolling manner to assure availability. However, in such situations a subset of brokers are using the old partition count and a subset of brokers are using the new one. This leaves consumer groups in undefined and erroring states for the duration of the rolling restart. This KIP proposes for brokers to refresh the partition count of __consumer_offsets whenever it is changed without requiring a restart.

Graphs

The above graph demonstrates a noticeable increase in FindCoordinator requests for the duration of the rolling restart.

The above graph demonstrates a noticeable drop in successful OffsetCommit to __consumer_offsets for the duration of the rolling restart.

Public Interfaces

No public interfaces will be changed.

Proposed Changes

Reference implementation: https://github.com/apache/kafka/compare/trunk...clolov:kafka:consumer_offsets

The proposal in this KIP is for individual brokers to refresh the partition count of the __consumer_offsets topic whenever they detect a change in the topic without requiring a restart. Currently, such a change is propagated in the cluster by a LeaderAndISR request sent to all affected brokers by the controller. There is already a code path which invokes specific behaviour if the changed topic is __consumer_offsets. We can hook into that code path and invoke the refresh method so that the GroupCoordinator entity starts using the new partition count.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? Behaviour which previously manifested itself upon a restart will now manifest itself dynamically.
  • If we are changing behavior how will we phase out the older behavior? We are changing that an event happens sooner rather than later.
  • If we need special migration tools, describe them here. N/A
  • When will we remove the existing behavior? In the next Kafka release (smile)

Test Plan

The test setup which can reproduce the problem is as follows:

Setup

  • We have 2 brokers using Zookeeper.
  • We have a topic (test_topic) with 10 partitions and replication factor of 2.
  • We have __consumer_offset with 55 partitions and replication factor of 2.
  • We have a producer which constantly produces to test_topic.
  • We have a consumer group (consumer_group) with 2 consumers which constantly consume.
  • Group coordinator is leader_of_partition(abs(hash(consumer_group)) % 55) = leader_of_partition(48) = broker 2.
  • We increase the number of partitions of the __consumer_offsets to 65.
  • The new group coordinator should be leader_of_partition(abs(hash(test_consumer_group)) % 65) = leader_of_partition(43) = broker 1

Current (3.3.1) behaviour

  • Stop broker 2.
  • Partition leadership will migrate to broker 1.
  • Broker 1 will be the new group coordinator.
  • Start broker 2.
  • The leadership of some partitions should move back to broker 2.
  • Broker 1 should stop being the coordinator of the group and the coordinator should move to broker 2.
  • Broker 2 should calculate that the coordinator should be broker 1 and it should direct consumers to it.
  • Broker 1 should direct them to 2 and this should continue repeating until we restart broker 1 as well.

I believe the above can be added as an integration test.

Rejected Alternatives

  1. Prevent clients from modifying the __consumer_offsets using the kafka-topics tool. Even though the Kafka documentation advises that __consumer_offsets should not be changed once created the tools do not enforce this. Furthermore, there are valid situations where a customer wants to increase the partitions of __consumer_offsets - to handle more traffic.
  2. We already have configurations which determine some settings of the __consumer_offsets topic and it would be neat to expose partition modification through a configuration. However, any such configuration still requires a mechanism to be applied dynamically which is what this KIP aims to provide.
  3. Ideally Kafka should be able to detect a change in __consumer_offsets partition count, figure out in which new partitions consumers would expect to find their data and move it seamlessly. While initially this sounds like an expensive thing it is important to note that upon any broker restart that broker reads through __consumer_offsets partitions it is responsible for to restore the group state anyway.
  • No labels