Table of Contents |
---|
Over several years of usage we have run into a number of complaints about the scala client. This is a proposal for addressing these complaints.
...
We will introduce a set of RPC apis for managing partition assignment on the consumer side. This will be based on the prototype here. This set of APIs is orthogonal to the APIs for producing and consuming data, it is responsible for group membership.
Heartbeat
Code Block |
---|
Heartbeat
Version => int16
CorrelationId => int64
ClientId => string
SessionTimeout => int64
|
HeartbeatResponse
Code Block |
---|
Version => int16
CorrelationId => int64
ControllerGeneration => int64
ErrorCode => int16 // error code is non zero if the group change is to be initiated
|
CreateGroup
api | sender | description | issue to |
---|---|---|---|
ListGroups | client | Returns metadata for one or more groups. If issued with a list of GroupName it returns metadata for just those groups. If no GroupName is given it returns metadata for all active groups. This request can be issued to any server. | any server |
CreateGroup | client | Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group. | any server |
DeleteGroup | client | Deletes a non-emphemeral group (but only if it has no members) | controller |
RegisterConsumer | client | Ask to join the given group. This happens as part of the event loop that gets invoked when the consumer uses the poll() API | controller |
Heartbeat | client | This heartbeat message must be sent by the consumer to the controller with an SLA specified in the CreateGroup command. The client can and should issues these more frequently to avoid accidentally timing out. | controller |
Heartbeat
Code Block |
---|
Heartbeat
Version |
Code Block |
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => int16 CorrelationId => stringint64 SessionTimeout ClientId => int64string Topics SessionTimeout =>[string] int64 |
...
HeartbeatResponse
Code Block |
---|
Version => int16 CorrelationId => int64 ControllerGeneration => int64 ClientId ErrorCode => string ConsumerGroupint16 // error code is non zero if the group change is to be initiated |
CreateGroup
Code Block |
---|
Version => string ErrorCode => int16 |
RegisterConsumer
Code Block |
---|
VersionCorrelationId => int64 ClientId => int16 CorrelationId => int64 ClientId string ConsumerGroup => string ConsumerGroup SessionTimeout => string ConsumerIdint64 Topics => [string] |
...
CreateGroupResponse
Code Block |
---|
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => string PartitionsToOwnint16 CorrelationId => int64 ConsumerGroup => [{Topic Partition Offset}] string ErrorCode Topic => string Partition=> int16 |
DeleteGroup
Code Block |
---|
Version => int16 CorrelationId Offset => int64 ClientId => int64 ErrorCode => string ConsumerGroup => int16[string] |
...
DeleteGroupResponse
Code Block |
---|
Version => int16 CorrelationId => int64 ClientId int16 CorrelationId => stringint64 ConsumerGroupsDeletedConsumerGroups => [string] |
ListGroupsResponse
ErrorCode => int16
|
RegisterConsumer
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId |
Code Block |
Version => int16 CorrelationId => int64 ClientId => string GroupsInfo => [{GroupName, GroupMembers, Topics, ControllerBroker, SessionTimeout}] GroupName => string ConsumerGroup GroupMembers => [string] Topics ConsumerId => [string] ControllerBroker => Broker Broker => BrokerId Host Port BrokerIdstring |
RegisterConsumerResponse
Code Block |
---|
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup => string PartitionsToOwn => [{Topic => int32Partition Offset}] Topic Host => string Partition => int16 Offset Port => int16int64 ErrorCode => int16 |
...
ListGroups
Code Block |
---|
Version => int16 CorrelationId => int64 ClientId => string ConsumerGroup ConsumerGroups => [string NewOffsets => [{Topic Partition Offset}] Topic ] |
ListGroupsResponse
Code Block |
---|
Version => int16 CorrelationId => int64 ClientId => string GroupsInfo Partition => [{GroupName, GroupMembers, Topics, => int16ControllerBroker, SessionTimeout}] Offset GroupName => int64 |
RewindConsumerResponse
Code Block |
---|
Versionstring GroupMembers => [string] Topics => int16 CorrelationId => int64[string] ConsumerGroup ControllerBroker => string ActualOffsetsBroker Broker => [{TopicBrokerId PartitionHost Offset}]Port Topic BrokerId => int32 => string Host Partition => int16string Offset Port => int64int16 ErrorCode => int16 |
...
RewindConsumer
...
sender
...
fields
...
returns
...
description
...
issue to
...
list_groups
...
client
...
group_name (optional)
...
group_name
controller_id/host/port
...
Returns metadata for one or more groups. If issued with a group_name it returns metadata for just that group. If no group_name is given it returns metadata for all active groups. This request can be issued to any server.
...
any server
...
create_group
...
client
...
group_name
heart_beat_freq
ephemeral
...
group_name
controller_id/host/port
...
Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group.
If the ephemeral flag is set the group will disappear when the last client exits.
...
any server
...
delete_group
...
client
...
group_name
...
ack
...
Deletes a non-emphemeral group (but only if it has no members)
...
controller
...
join_group
...
client
...
group_name
...
no response
...
Ask to join the given group
...
controller
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
ConsumerGroup => string
NewOffsets => [{Topic Partition Offset}]
Topic => string
Partition => int16
Offset => int64
|
RewindConsumerResponse
Code Block |
---|
Version => int16
CorrelationId => int64
ConsumerGroup => string
ActualOffsets => [{Topic Partition Offset}]
Topic => string
Partition => int16
Offset => int64
ErrorCode => int16
|
...
im_alive
...
client
...
group_generation
...
no response
...
This heartbeat message must be sent by the consumer to the controller with an SLA specified in the create_group command. The client can and should issues these more frequently to avoid accidentally timing out.
...
The use of this protocol would be as follows:
...