...
Code Block |
---|
{
size: int32 // the size of this response, not including the first 4 bytes for this field
error_code: int16 // global error code for this request, if any
coordinator_id: int16 // broker id of the coordinator
brokers_info: [<broker_struct>] // current broker list
}
broker_struct =>
{
creator_id: string
id: int32
host: string
port: int32
}
|
...
The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first heartbeat within heartbeatstop or the ping request within ping_interval_ms.
RegisterRequest
Code Block |
---|
{
size: int32 // the size of this request
request_type_id: int16 // the type of the request, currently only one type is allowed: ConnectRequest
group_id: string // group that the consumer belongs to
consumer_id: string // consumer id
auto_commit: boolean // indicator of whether autocommit is enabled
auto_offset_rest: string // indicator of what to do if an offset is out of range, currently either smallest or largest
heartbeatping_interval_ms: int32 // heartbeatping interval in milliseconds, and the consumer is expected to response within the interval also
max_heartbeatping_retries: int16 // maximum number of allowed heartbeatping timeouts before the consumer to be treated as faile
|
PingRequest
PintResponse
StopRequest
StopResponse
StartRequest
StartResponse
There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms
PingRequest
Code Block |
---|
{
size: int32 // the size of this request
request_type_id: int16 // the type of the request, distinguish Ping/Stop/Start requests
num_retries: int16 // indicate this is the #th ping retry
}
|
PintResponse
Code Block |
---|
{
size: int32 // the size of this response
num_partition: int16 // number of partitions that the consumer is consuming
offset_info: [<offset_struct>] // the offset info for each partition consumed by this consumer, its size should be exactly num_partition
// if auto_commit is set to false this field should not exist
}
offset_struct =>
{
topic: string
partition: string
offset: int64
}
|
StopRequest
Code Block |
---|
{
size: int32 // the size of this request request_type_id: int16 // the type of the request, distinguish Ping/Stop/Start requests}
|
StopResponse
Code Block |
---|
{
size: int32 // the size of this response
error_code: int16 // global error code for this request, if any
}
|
StartRequest
Code Block |
---|
{
size: int32 // the size of this request request_type_id: int16 // the type of the request, distinguish Ping/Stop/Start requests
assigned_parts: [<partition_struct>] // the assigned ownership of the partitions, along with the starting offset
}
partition_struct =>
{
topic: string
partition: string // note that this partition string already contains the broker id
offset: int64
}
|
StartResponse
Code Block |
---|
{
size: int32 // the size of this response
error_code: int16 // global error code for this request, if any |
Code Block |
---|
{
}
|
Open Problems
- When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
- The rebalance thread pool and the heartbeating ping thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
- Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process