...
Code Block |
---|
{ Version => int16 CorrelationId => int64 GroupId => String ConsumerAddr => String SessionTimeout => int64 Subscribing => [String] } |
JoinGroupResponse
Code Block |
---|
{ Version => int16 CorrelationId => int64 ErrorCode => int16 Generation => int16 GroupId => String ConsumerId => String PartitionsToOwn => [<TopicAndPartition>] } |
Note that the consumer id is assigned by the coordinator upon joining the group, which is then used in heartbeat protocol and offset management.
...
This can lift the lower bound of the rebalance latency on ping frequency, but will require more one metadata request along with unnecessary rebalances on coordinator failover (cause they are now treated as one scenario).
Offset Management
Commit Offset
Upon receiving the commit offset request, the coordinator checks if the consumer is 1) within the group specified, 2) owns the partitions it commit offsets to, and 3) has the correct generation number. If yes append the offset entry to the corresponding log, otherwise rejects the request.
Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.
...
OffsetCommitRequest
Code Block |
---|
{
Version => int16
CorrelationId => int64
GroupId => String
ConsumerId => String
Generation => int16
Offsets => [TopicAndPartition -> OffsetAndError]
} |
OffsetCommitResponse
Code Block |
---|
{
Version => int16
CorrelationId => int64
responses => [TopicAndPartition -> int16] // error code for each partition
} |
Fetch Offset
Fetch offset request is also handled by the coordinator.
Upon receiving the fetch offset request, the coordinator checks if the consumer within the group specified. If yes returns the current offset, otherwise rejects the request.
Upon receiving a reject response from the coordinator, the consumer will try to connect to the new coordinator (i.e., set up the connection, send heartbeat and get valid response) and then retry.
OffsetFetchRequest
Code Block |
---|
{
Version => int16
CorrelationId => int64
GroupId => String
ConsumerId => String
Generation => int16
partitions => [TopicAndPartition]
} |
OffsetFetchResponse
Code Block |
---|
{
Version => int16
CorrelationId => int64
offsets => [TopicAndPartition -> OffsetAndError]
} |
Consumer Architecture
API Implementation
...