Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The new protocol’s RPCs are specified in the details in the public interfaces section of this document while the details of the rebalance logic is described in the next chapter.

Group Coordinator

TODO: Update this section to talk about the rewrite.

We propose to extend Implementing the new rebalance protocol in the current group coordinator to implement the new consumer group rebalance protocol. This chapter covers the various aspects related to the group coordinatoris not appropriate in our opinion because its requires many changes anyway in the current protocol to make it interoperable. Therefore, this KIP proposes to rewrite the group coordinator from scratch in Java. The new group coordinator will have a state machine per __consumer_offsets partitions, where each state machine is modelled as an event loop. Those state machines will be executed in N threads.

Consumer Groups

The group coordinator already supports the so called consumer groups. Those groups are groups which implement the “consumer” embedded protocol type. With the introduction of the new consumer rebalance protocol, we need a way to differentiate the existing groups from the new consumer groups. This is important because the existing group relies on a specific set of APIs whereas the new consumer group will use a different set of APIs.

...

The group coordinator is responsible for storing those group configurations in order to keep their lifecycle tight to their group. When a group is deleted, we want the configuration to be deleted as well. This assumes that IncrementalAlterConfigs and the DescribeConfigs API will be routed to the group coordinator owning the group they are acting upon.

Feature Flag / MetadataVersion / IBP

We introduce a new feature flag named “group.version” new.coordinator.enable” instead of relying on the IBP to enable the new consumer group protocol. The idea is to follow a similar path than KIP-778. V1 will be the group coordinator as we know it today and V2 will support the new consumer group protocol. Using a feature flag will enable operators This flag is particularly useful during the development phase as it allows us to enable the new consumer group protocol without having to roll the cluster twice. In other words, once the new software is deployed on all the nodes, the operator can update the feature flag to enable the feature. Until the feature is enabled, the new APIs won’t be advertised by the ApiVersions API.

MetadataVersion / IBP

TODO

group coordinator while keeping the other for production deployment. This also allows operators to explicitly enable it in the future.

When the feature will be marked as production ready, we will replace the feature flag by using the IBP / Metadata Version.

Persistence

We will introduce a new set of records to persist the new consumer group type in the existing __consumer_offsets topic. The records are detailed in the public interfaces section of this document.

...

The semantics of the consumer will remain unchanged after this proposal is implemented. The goal is to swap the implementation of the group membership/assignment protocol by the new one.

Feature Flag

A new configuration setting will be used to determine whether the new protocol should be used or not. The feature flag allows the user to control when he starts using or migrating to the new protocol for its application. This is also required by our migration path as we will require to have the software on a specific version which is compatible with the new protocol.

...

New properties in the broker configuration.

NameTypeDefaultDoc
group.new.coordinator.enableboolfalseWether to enable the new group coordinator and the consumer group protocol.
group.coordinator.threadsint1The number of threads used to run the state machines.
group.consumer.session.timeout.msint30sThe timeout to detect client failures when using the consumer group protocol.
group.consumer.min.session.timeout.msint45sThe minimum session timeout.
group.consumer.max.session.timeout.msint60sThe maximum session timeout.
group.consumer.heartbeat.interval.msint5sThe heartbeat interval given to the members.
group.consumer.min.heartbeat.interval.msint5sThe minimum heartbeat interval.
group.consumer.max.heartbeat.interval.msint15sThe maximum heartbeat interval.
group.consumer.max.sizeintMaxValueThe maximum number of consumers that a single consumer group can accommodate.
group.consumer.assignorsListrange, uniformThe server side assignors.

...