...
One benefit of the dynamic toggle and the added zookeeper state is that it gives admins control over migrating consumers between zookeeper-based coordination and kafka-based coordination across a large organization. You can imagine a script that scans groups in zookeeper and toggles them to kafka-based group coordination if the group was fully migrated to MEZKCCs and stable for some time. If the switch to kafka-based group coordination proves to stress the kafka cluster, admins can toggle the MEZKCC groups back to zookeeper-based coordination on-the-fly to relieve stress from the kafka cluster.
Example Migration
The following diagrams illustrate a full migration from OZKCCs to KCs.
Initial state with a group of OZKCCs:
Begin migration from OZKCCs to MEZKCCs:
The group has fully migrated to MEZKCCs while still using zookeeper-based coordination:
The coordination mode toggle is applied so that the group of MEZKCCs use kafka-based coordination:
Begin migration from MEZKCCs to KCs:
Final state with a group of KCs:
Rejected Alternatives
- Adapt
KafkaConsumer
to understand zookeeper-based coordination. This approach was rejected since it introduces a zookeeper dependency intokafka-clients
. - Build a wrapper class comprised of a
ZookeeperConsumerConnector
andKafkaConsumer
. When the coordination mode trigger is fired, toggle consumption to the corresponding consumer. This approach was rejected for several reasons:- It requires a transformation from
ZookeeperConsumerConnector
's orKafkaConsumer
's consumption API into the wrapper class consumption API while properly tracking offsets.- Should it adopt
ZookeeperConsumerConnector
's API of providingKafkaStream
s? - Should it adopt
KafkaConsumer
's polling API that providesConsumerRecords
?
- Should it adopt
- It introduces yet another consumer client to kafka
- Users would need to change their code to use the new client
- It requires a transformation from
- Embed a
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
insideZookeeperConsumerConnector
instead of aKafkaConsumer
. This approach was rejected becauseConsumerCoordinator
is in the "internals" package and is subject to API changes without notice. Since API changes toConsumerCoordinator
might require changes to the KIP's proposedZookeeperConsumerConnector
running kafka-based coordination, this KIP instead opts for embedding the user-facingKafkaConsumer
. - Regarding the subtask of providing a global state for
ConsumerRebalanceListener
to preserve existing behavior, we had considered just instantiating an EKC per consumer thread id so that kafka-based coordination would solve the problem of mapping partitions to consumer threads for us instead of stitching togetherDescribeGroupsResponse
and zookeeper state. We ultimately went against this approach due to the added complexity of managing many EKCs. Another downside of this approach is that a standard partition assignment strategy using kafka-based coordination would give equal weight to aZookeeperConsumerConnector
consumer thread and aKafkaConsumer
, causing an uneven partition ownership distribution across the group. - Merge the
/consumers/<group id>/ids
and/consumers/<group id>/migration/ids
directories by simply defining a new znode data version 2 for MEZKCCs. This was rejected to avoid any possibility of breaking clients as they parse the znode.