Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

One benefit of the dynamic toggle and the added zookeeper state is that it gives admins control over migrating consumers between zookeeper-based coordination and kafka-based coordination across a large organization. You can imagine a script that scans groups in zookeeper and toggles them to kafka-based group coordination if the group was fully migrated to MEZKCCs and stable for some time. If the switch to kafka-based group coordination proves to stress the kafka cluster, admins can toggle the MEZKCC groups back to zookeeper-based coordination on-the-fly to relieve stress from the kafka cluster.

Example Migration

The following diagrams illustrate a full migration from OZKCCs to KCs.

Initial state with a group of OZKCCs:

Gliffy Diagram
size600
nameInitial state with a group of OZKCCs

Begin migration from OZKCCs to MEZKCCs:

Gliffy Diagram
size600
nameBegin migration from OZKCCs to MEZKCCs

The group has fully migrated to MEZKCCs while still using zookeeper-based coordination:

Gliffy Diagram
size600
nameThe group has fully migrated to MEZKCCs while still using zookeeper-based coordination

The coordination mode toggle is applied so that the group of MEZKCCs uses kafka-based coordination:

Gliffy Diagram
size600
nameThe coordination mode toggle is applied so that the group of MEZKCCs uses kafka-based coordination

Begin migration from MEZKCCs to KCs:

Gliffy Diagram
size600
nameBegin migration from MEZKCCs to KCs

Final state with a group of KCs:

Gliffy Diagram
size600
nameFinal state with a group of KCs

Rejected Alternatives

  1. Adapt KafkaConsumer to understand zookeeper-based coordination. This approach was rejected since it introduces a zookeeper dependency into kafka-clients.
  2. Build a wrapper class comprised of a ZookeeperConsumerConnector and KafkaConsumer. When the coordination mode trigger is fired, toggle consumption to the corresponding consumer. This approach was rejected for several reasons:
    1. It requires a transformation from ZookeeperConsumerConnector's or KafkaConsumer's consumption API into the wrapper class consumption API while properly tracking offsets.
      1. Should it adopt ZookeeperConsumerConnector's API of providing KafkaStreams?
      2. Should it adopt KafkaConsumer's polling API that provides ConsumerRecords?
    2. It introduces yet another consumer client to kafka
    3. Users would need to change their code to use the new client
  3. Embed a org.apache.kafka.clients.consumer.internals.ConsumerCoordinator inside ZookeeperConsumerConnector instead of a KafkaConsumer. This approach was rejected because ConsumerCoordinator is in the "internals" package and is subject to API changes without notice. Since API changes to ConsumerCoordinator might require changes to the KIP's proposed ZookeeperConsumerConnector running kafka-based coordination, this KIP instead opts for embedding the user-facing KafkaConsumer.
  4. Regarding the subtask of providing a global state for ConsumerRebalanceListener to preserve existing behavior, we had considered just instantiating an EKC per consumer thread id so that kafka-based coordination would solve the problem of mapping partitions to consumer threads for us instead of stitching together DescribeGroupsResponse and zookeeper state. We ultimately went against this approach due to the added complexity of managing many EKCs. Another downside of this approach is that a standard partition assignment strategy using kafka-based coordination would give equal weight to a ZookeeperConsumerConnector consumer thread and a KafkaConsumer, causing an uneven partition ownership distribution across the group.
  5. Merge the /consumers/<group id>/ids and /consumers/<group id>/migration/ids directories by simply defining a new znode data version 2 for MEZKCCs. This was rejected to avoid any possibility of breaking clients as they parse the znode.