Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The timeout the user specifies will be purely to ensure we have a mechanism to give control back to the user even when no messages are delivered. It is up to the user to ensure poll() is called again within the heartbeat frequency set for the consumer group. Internally the timeout on our select() may uses a shorter timeout to ensure the heartbeat frequency is met even when no messages are delivered.

Consumer Group Membership Protocol

We will introduce a set of RPC apis for managing partition assignment on the consumer side. This will be based on the prototype here. This set of APIs is orthogonal to the APIs for producing and consuming data, it is responsible for group membership.My proposal would be to actually not handle partition assignment at all. The only facility we need to offer is a group membership facility. If all consumers agree on who exists the consumer client implementation can implement any partition assignment it likes.

Heartbeat

Code Block
Heartbeat
  Version            => int16
  CorrelationId      => int64
  ClientId           => string
  SessionTimeout     => int64

HeartbeatResponse

Code Block
Version                => int16
CorrelationId          => int64
ControllerGeneration   => int64
ErrorCode              => int16 // error code is non zero if the group change is to be initiated

CreateGroup

Code Block
Version                   => int16
CorrelationId             => int64
ClientId                  => string
ConsumerGroup             => string
SessionTimeout            => int64
Topics                    =>[string]

CreateGroupResponse

Code Block
Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroup              => string
ErrorCode                  => int16

RegisterConsumer

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroup              => string
ConsumerId                 => string

RegisterConsumerResponse

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroup              => string
PartitionsToOwn            => [{Topic Partition Offset}]
  Topic                    => string
  Partition                => int16
  Offset                   => int64
ErrorCode                  => int16

ListGroups

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroups             => [string]

ListGroupsResponse

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
GroupsInfo                 => [{GroupName, GroupMembers, Topics, ControllerBroker, SessionTimeout}]
  GroupName                => string
  GroupMembers             => [string]
  Topics                   => [string]
  ControllerBroker         => Broker
     Broker                => BrokerId Host Port 
     BrokerId              => int32
     Host                  => string
     Port                  => int16
ErrorCode                  => int16

RewindConsumer

RewindConsumerResponse

api

sender

fields

returns

description

issue to

list_groups

client

group_name (optional)

group_name
controller_id/host/port

Returns metadata for one or more groups. If issued with a group_name it returns metadata for just that group. If no group_name is given it returns metadata for all active groups. This request can be issued to any server.

any server

create_group

client

group_name
heart_beat_freq
ephemeral

group_name
controller_id/host/port

Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group.
If the ephemeral flag is set the group will disappear when the last client exits.

any server

delete_group

client

group_name

ack

Deletes a non-emphemeral group (but only if it has no members)

controller

join_group

client

group_name

no response

Ask to join the given group

controller

im_alive

client

group_generation

no response

This heartbeat message must be sent by the consumer to the controller with an SLA specified in the create_group command. The client can and should issues these more frequently to avoid accidentally timing out.

controller

...

Offset Rewind

Code Block
 while(true) {  
  List<MessageAndMetadata> messages = consumer.poll(timeout);
  process(messages);
  if(rewind_required) {
     List<PartitionOffset> partitionOffsets = new ArrayList<PartitionOffset>();
     partitionOffsets.add(new PartitionOffset(topic, partition, offset));
     rewind_offsets(partitionOffsets);     
  }  
}

...