Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Table of Contents

Over several years of usage we have run into a number of complaints about the scala client. This is a proposal for addressing these complaints.

...

We will introduce a set of RPC apis for managing partition assignment on the consumer side. This will be based on the prototype here. This set of APIs is orthogonal to the APIs for producing and consuming data, it is responsible for group membership.

Heartbeat

Code Block
Heartbeat
  Version            => int16
  CorrelationId      => int64
  ClientId           => string
  SessionTimeout     => int64

HeartbeatResponse

Code Block
Version                => int16
CorrelationId          => int64
ControllerGeneration   => int64
ErrorCode              => int16 // error code is non zero if the group change is to be initiated

CreateGroup

api

sender

description

issue to

ListGroups

client

Returns metadata for one or more groups. If issued with a list of GroupName it returns metadata for just those groups. If no GroupName is given it returns metadata for all active groups. This request can be issued to any server.

any server

CreateGroup

client

Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group.
If the ephemeral flag is set the group will disappear when the last client exits.

any server

DeleteGroup

client

Deletes a non-emphemeral group (but only if it has no members)

controller

RegisterConsumer

client

Ask to join the given group. This happens as part of the event loop that gets invoked when the consumer uses the poll() API

controller

Heartbeat

client

This heartbeat message must be sent by the consumer to the controller with an SLA specified in the CreateGroup command. The client can and should issues these more frequently to avoid accidentally timing out.

controller

Heartbeat

Code Block
Heartbeat
  Version
Code Block
Version                   => int16
CorrelationId             => int64
ClientId                  => string
ConsumerGroup   => int16
   CorrelationId      => stringint64
SessionTimeout  ClientId           => int64string
Topics               SessionTimeout     =>[string] int64

...

HeartbeatResponse

Code Block

Version                    => int16
CorrelationId          => int64
ControllerGeneration   => int64
ClientId     ErrorCode              => string
ConsumerGroupint16 // error code is non zero if the group change is to be initiated

CreateGroup

Code Block
Version  => string
ErrorCode                  => int16

RegisterConsumer

Code Block

VersionCorrelationId                 => int64
ClientId   => int16
CorrelationId              => int64
ClientId      string
ConsumerGroup             => string
ConsumerGroup  SessionTimeout            => string
ConsumerIdint64
Topics                    => [string]

...

CreateGroupResponse

Code Block
Version                    => int16
CorrelationId                        => int64
ClientId                   => string
ConsumerGroup              => string
PartitionsToOwnint16
CorrelationId              => int64
ConsumerGroup              => [{Topic Partition Offset}]
string
ErrorCode  Topic                    => string
  Partition=> int16

DeleteGroup

Code Block

Version                   => int16
CorrelationId        Offset     => int64
ClientId             => int64
ErrorCode    => string
ConsumerGroup             => int16[string]

...

DeleteGroupResponse

Code Block
Version                    => int16
CorrelationId                => int64
ClientId                  int16
CorrelationId              => stringint64
ConsumerGroupsDeletedConsumerGroups             => [string]

ListGroupsResponse

ErrorCode                  => int16

RegisterConsumer

Code Block

Version                    => int16
CorrelationId              => int64
ClientId   
Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
GroupsInfo                 => [{GroupName, GroupMembers, Topics, ControllerBroker, SessionTimeout}]
  GroupName                => string
 ConsumerGroup GroupMembers             => [string]
  Topics  ConsumerId                 => [string]
  ControllerBroker         => Broker
     Broker                => BrokerId Host Port 
     BrokerIdstring

RegisterConsumerResponse

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroup              => string
PartitionsToOwn            => [{Topic => int32Partition Offset}]
   Topic  Host                  => string
  Partition                => int16
  Offset Port                  => int16int64
ErrorCode                  => int16

...

ListGroups

Code Block
Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroup ConsumerGroups             => [string
NewOffsets                 => [{Topic Partition Offset}]
  Topic                   ]

ListGroupsResponse

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
GroupsInfo  Partition               => [{GroupName, GroupMembers, Topics,     => int16ControllerBroker, SessionTimeout}]
  Offset   GroupName                => int64

RewindConsumerResponse

Code Block

Versionstring
  GroupMembers             => [string]
  Topics    => int16
CorrelationId              => int64[string]
ConsumerGroup     ControllerBroker         => string
ActualOffsetsBroker
     Broker                => [{TopicBrokerId PartitionHost Offset}]Port
    Topic BrokerId              => int32
    => string
Host  Partition                => int16string
    Offset Port                  => int64int16
ErrorCode                  => int16

...

RewindConsumer

...

sender

...

fields

...

returns

...

description

...

issue to

...

list_groups

...

client

...

group_name (optional)

...

group_name
controller_id/host/port

...

Returns metadata for one or more groups. If issued with a group_name it returns metadata for just that group. If no group_name is given it returns metadata for all active groups. This request can be issued to any server.

...

any server

...

create_group

...

client

...

group_name
heart_beat_freq
ephemeral

...

group_name
controller_id/host/port

...

Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group.
If the ephemeral flag is set the group will disappear when the last client exits.

...

any server

...

delete_group

...

client

...

group_name

...

ack

...

Deletes a non-emphemeral group (but only if it has no members)

...

controller

...

join_group

...

client

...

group_name

...

no response

...

Ask to join the given group

...

controller

Code Block

Version                    => int16
CorrelationId              => int64
ClientId                   => string
ConsumerGroup              => string
NewOffsets                 => [{Topic Partition Offset}]
  Topic                    => string
  Partition                => int16
  Offset                   => int64

RewindConsumerResponse

Code Block

Version                    => int16
CorrelationId              => int64
ConsumerGroup              => string
ActualOffsets              => [{Topic Partition Offset}]
  Topic                    => string
  Partition                => int16
  Offset                   => int64
ErrorCode                  => int16

...

im_alive

...

client

...

group_generation

...

no response

...

This heartbeat message must be sent by the consumer to the controller with an SLA specified in the create_group command. The client can and should issues these more frequently to avoid accidentally timing out.

...

The use of this protocol would be as follows:

...