Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

api

sender

fields

returns

description

issue to

list_groups

client

group_name (optional)

group_name
controller_id/host/port

Returns metadata for one or more groups. If issued with a group_name it returns metadata for just that group. If no group_name is given it returns metadata for all active groups. This request can be issued to any server.

any server

create_group

client

group_name
heart_beat_freq
ephemeral

group_name
controller_id/host/port

Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group.
If the ephemeral flag is set the group will disappear when the last client exits.

any server

delete_group

client

group_name

ack

Deletes a non-emphemeral group (but only if it has no members)

controller

join_group

client

group_name

no response

Ask to join the given group

controller

begin_group_change

server

 

ack

This message is sent from the controller to the group members. Group change happens in two phases--first all group members are notified of the change, when they all acknowledge, then they are sent the new group membership list.

client

group_changed

server

group_generation
group_members

no response

This message alerts all group members of the current group membership. This is the second phase of the group membership change.

client

im_alive

client

group_generation

no response

This heartbeat message must be sent by the consumer to the controller with an SLA specified in the create_group command. The client can and should issues these more frequently to avoid accidentally timing out.

controller

The use of this protocol would be as follows:

On startup or when co-ordinator heartbeat fails -

  • on startup the consumer issues a list_groups(my_group) to a random broker to find out the location of the controller for its group (each server is the controller for some groups)
  • knowing where the controller is, it connects and issues a join_group commandRegisterConsumer RPC, then it awaits a begin_group_change and group_changed messageRegisterConsumerResponse
  • on receiving the RegisterConsumer request the server sends an error code in the HeartbeatResponse to all alive group members and waits for a new RegisterConsumer request from each consumer, except the one that sent the initial RegisterConsumer.
  • on receiving an error code in the HeartbeatResponse, all consumers stop fetching, commit offsets, re-discover the controller through list_groups(my_group) and send a RegisterConsumer request to the controller
  • on receiving RegisterConsumer from all consumers of the group membership change, the controller sends the RegisterConsumerResponse to all the new group members, with the partitions and the respective offsets to restart consumption from
  • on receiving the RegisterConsumerResponse the consumer is now able to start consuming its partitions and must now start sending heartbeat messages back to the controller with the current generation id.

When new partitions are added to existing topics or new topics are created -

  • on discovering newly created topics or newly added partitions to existing topics, the controller sends an error code in the HeartbeatResponse on receiving the join_group command the server issues a begin_group_change message to all alive group members and waits for an acknowledgement a new RegisterConsumer request from each consumer. The two-phase group change ensures that it is never the case that two consumers have different views of the group at the same time., except the one that sent the initial RegisterConsumer.
  • on receiving an error code in the HeartbeatResponse, all consumers stop fetching, commit offsets, re-discover the controller through list_groups(my_group) and send a RegisterConsumer request to the controller
  • on receiving RegisterConsumer on receiving acknowledgements from all consumers of the group membership change, the controller sends the a group_changed message RegisterConsumerResponse to all the new group members, with the new partitions and the respective offsets to restart consumption from. For newly added partitions and topics, the offset is set to smallest (0L).
  • on receiving the new group membership RegisterConsumerResponse the consumer is now able to start consuming its partitions and must now start sending heartbeat messages back to the controller with the current generation id.

...