...
The timeout the user specifies will be purely to ensure we have a mechanism to give control back to the user even when no messages are delivered. It is up to the user to ensure poll() is called again within the heartbeat frequency set for the consumer group. Internally the timeout on our select() may uses a shorter timeout to ensure the heartbeat frequency is met even when no messages are delivered.
Consumer Group Membership Protocol
We will introduce a set of RPC apis for managing partition assignment on the consumer side. This will be based on the prototype here. This set of APIs is orthogonal to the APIs for producing and consuming data, it is responsible for group membership.My proposal would be to actually not handle partition assignment at all. The only facility we need to offer is a group membership facility. If all consumers agree on who exists the consumer client implementation can implement any partition assignment it likes.
Heartbeat
Code Block |
---|
Heartbeat
Version => int16
CorrelationId => int64
ClientId => string
SessionTimeout => int64
|
HeartbeatResponse
Code Block |
---|
Version => int16
CorrelationId => int64
ControllerGeneration => int64
ErrorCode => int16 // error code is non zero if the group change is to be initiated
|
CreateGroup
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
ConsumerGroup => string
SessionTimeout => int64
Topics =>[string]
|
CreateGroupResponse
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
ConsumerGroup => string
ErrorCode => int16
|
RegisterConsumer
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
ConsumerGroup => string
ConsumerId => string
|
RegisterConsumerResponse
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
ConsumerGroup => string
PartitionsToOwn => [{Topic Partition Offset}]
Topic => string
Partition => int16
Offset => int64
ErrorCode => int16
|
ListGroups
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
ConsumerGroups => [string]
|
ListGroupsResponse
Code Block |
---|
Version => int16
CorrelationId => int64
ClientId => string
GroupsInfo => [{GroupName, GroupMembers, Topics, ControllerBroker, SessionTimeout}]
GroupName => string
GroupMembers => [string]
Topics => [string]
ControllerBroker => Broker
Broker => BrokerId Host Port
BrokerId => int32
Host => string
Port => int16
ErrorCode => int16
|
RewindConsumer
RewindConsumerResponse
api | sender | fields | returns | description | issue to |
---|---|---|---|---|---|
list_groups | client | group_name (optional) | group_name | Returns metadata for one or more groups. If issued with a group_name it returns metadata for just that group. If no group_name is given it returns metadata for all active groups. This request can be issued to any server. | any server |
create_group | client | group_name | group_name | Create a new group with the given name and the specified minimum heartbeat frequency. Return the id/host/port of the server acting as the controller for that group. | any server |
delete_group | client | group_name | ack | Deletes a non-emphemeral group (but only if it has no members) | controller |
join_group | client | group_name | no response | Ask to join the given group | controller |
im_alive | client | group_generation | no response | This heartbeat message must be sent by the consumer to the controller with an SLA specified in the create_group command. The client can and should issues these more frequently to avoid accidentally timing out. | controller |
...
Offset Rewind
Code Block |
---|
while(true) {
List<MessageAndMetadata> messages = consumer.poll(timeout);
process(messages);
if(rewind_required) {
List<PartitionOffset> partitionOffsets = new ArrayList<PartitionOffset>();
partitionOffsets.add(new PartitionOffset(topic, partition, offset));
rewind_offsets(partitionOffsets);
}
}
|
...