...
The request sent by the consumer can be received by any broker, and is handled by the broker's ???SocketHandlersocketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.
The request sent by the coordinator is received by the consumer client, and is handled by the consumer's ???SocketHandler.
Consumer Request Format :
field | type | order | description |
---|---|---|---|
size | int32 | 1 | The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer. |
request_type_id | int16 | 2 | An id for the API being called (e.g., ProduceRequest, FetchRequest and ConsultRequest, etc). |
version_id | int16 | 3 | A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API. |
client_id | string | 4 | An user-defined identifier for the client which is used for logging and statistics purposes. |
Consumer Response Format :
field | type | order | description |
---|---|---|---|
size | int32 | 1 | The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer. |
correlation_id | int32 | 2 | An id that can be set by the client and will be returned untouched by the server in the response. |
version_id | int16 | 3 | A version number for the response format. |
error | int16 | 4 | The id of the (request-level) error, if any occurred. |
ConsultRequest
Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.
ConsultRequest
Code Block |
---|
{
size: int32 // the size of this request, not including the first 4 bytes for this field
request_type_id: int16 // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
// do not need any data for the consult request
}
|
ConsultResponse
Code Block |
---|
{
size: int32 // the size of this response, not including the first 4 bytes for this field
error_code: int16 |
Code Block |
{ size: int32 // the size of this request request_type_id: int16 // the request id version_id: int16 // the version of this request correlation_id: int32 // an id set by the client that will be returned untouched in the response client_id: string // an optional non-machine-specific identifier for this client required_acks: int8 // theglobal numbererror ofcode acknowledgementsfor requiredthis fromrequest, theif brokers before a response can be made ack_timeout: int32any coordinator_id: int16 // the time in ms to wait for acknowledgement from replicas // broker id of the coordinator [<broker_struct>] // no need data for the consult request } |
ConsultResponse
Code Block |
---|
{ size: int32 // current broker list } broker_struct => { creator_id: string id: int32 host: string port: int32 } |
Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterRequest to the coordinator.
After that, the consumer will keep trying to read new request from the coordinator. Hence the consumer does not need to maintain a socket server but just a SocketChannel.
The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first heartbeat within heartbeat_interval_ms.
RegisterRequest
Code Block |
---|
{ size: int32 // the size of this request correlation_id: int16 // the request id version_id: int16 // the versionsize of this request errorrequest_type_id: int16 // the type of the request, currently only one type is allowed: ConnectRequest // an id set by the client that will be returned untouched in the response data: {coordinatorId: int16, [<broker_struct>]}group_id: string // group that the actualconsumer databelongs requestedto (in the same format as defined for the produce request) } broker_struct => { creatorId: id: host: port: } |
Coordinator Request Format :
field | type | order | description |
---|---|---|---|
size | int32 | 1 | The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer. |
request_type_id | int16 | 2 | An id for the API being called (e.g., PingRequest, StarRequest and StopRequest, etc). |
version_id | int16 | 3 | A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API. |
client_id | string | 4 | An user-defined identifier for the client which is used for logging and statistics purposes. |
Coordinator Response Format :
...
field
...
type
...
order
...
description
...
size
...
int32
...
1
...
The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.
...
request_type_id
...
int16
...
2
...
An id for the API being called (e.g., PingRequest, StarRequest and StopRequest, etc).
...
version_id
...
int16
...
3
...
A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.
...
client_id
...
string
...
4
...
consumer_id: string // consumer id
auto_commit: boolean // indicator of whether autocommit is enabled
auto_offset_rest: string // indicator of what to do if an offset is out of range, currently either smallest or largest
heartbeat_interval_ms: int32 // heartbeat interval in milliseconds, and the consumer is expected to response within the interval also
max_heartbeat_retries: int16 // maximum number of allowed heartbeat timeouts before the consumer to be treated as faile
|
PingRequest
Code Block |
---|
{ } |
PintResponse
...
- When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
- The rebalance thread pool and the heartbeating thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
- Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process.