Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One benefit of the dynamic toggle and the added zookeeper state is that it gives admins control over migrating consumers between zookeeper-based coordination and kafka-based coordination across a large organization. You can imagine a script that scans groups in zookeeper and toggles them to kafka-based group coordination if the group was fully migrated to MEZKCCs and stable for some time. If the switch to kafka-based group coordination proves to stress the kafka cluster, admins can toggle the MEZKCC groups back to zookeeper-based coordination on-the-fly to relieve stress from the kafka cluster.

Example Migration

The following diagrams illustrate a full migration from OZKCCs to KCs.

Initial state with a group of OZKCCs:

Begin migration from OZKCCs to MEZKCCs:

The group has fully migrated to MEZKCCs while still using zookeeper-based coordination:

The coordination mode toggle is applied so that the group of MEZKCCs use kafka-based coordination:

Begin migration from MEZKCCs to KCs:

Final state with a group of KCs:

Rejected Alternatives

  1. Adapt KafkaConsumer to understand zookeeper-based coordination. This approach was rejected since it introduces a zookeeper dependency into kafka-clients.
  2. Build a wrapper class comprised of a ZookeeperConsumerConnector and KafkaConsumer. When the coordination mode trigger is fired, toggle consumption to the corresponding consumer. This approach was rejected for several reasons:
    1. It requires a transformation from ZookeeperConsumerConnector's or KafkaConsumer's consumption API into the wrapper class consumption API while properly tracking offsets.
      1. Should it adopt ZookeeperConsumerConnector's API of providing KafkaStreams?
      2. Should it adopt KafkaConsumer's polling API that provides ConsumerRecords?
    2. It introduces yet another consumer client to kafka
    3. Users would need to change their code to use the new client
  3. Embed a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator inside ZookeeperConsumerConnector instead of a KafkaConsumer. This approach was rejected because ConsumerCoordinator is in the "internals" package and is subject to API changes without notice. Since API changes to ConsumerCoordinator might require changes to the KIP's proposed ZookeeperConsumerConnector running kafka-based coordination, this KIP instead opts for embedding the user-facing KafkaConsumer.
  4. Regarding the subtask of providing a global state for ConsumerRebalanceListener to preserve existing behavior, we had considered just instantiating an EKC per consumer thread id so that kafka-based coordination would solve the problem of mapping partitions to consumer threads for us instead of stitching together DescribeGroupsResponse and zookeeper state. We ultimately went against this approach due to the added complexity of managing many EKCs. Another downside of this approach is that a standard partition assignment strategy using kafka-based coordination would give equal weight to a ZookeeperConsumerConnector consumer thread and a KafkaConsumer, causing an uneven partition ownership distribution across the group.
  5. Merge the /consumers/<group id>/ids and /consumers/<group id>/migration/ids directories by simply defining a new znode data version 2 for MEZKCCs. This was rejected to avoid any possibility of breaking clients as they parse the znode.