Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The request sent by the consumer can be received by any broker, and is handled by the broker's ???SocketHandlersocketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

The request sent by the coordinator is received by the consumer client, and is handled by the consumer's ???SocketHandler.

Consumer Request Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

request_type_id

int16

2

An id for the API being called (e.g., ProduceRequest, FetchRequest and ConsultRequest, etc).

version_id

int16

3

A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.

client_id

string

4

An user-defined identifier for the client which is used for logging and statistics purposes.

Consumer Response Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

correlation_id

int32

2

An id that can be set by the client and will be returned untouched by the server in the response.

version_id

int16

3

A version number for the response format.

error

int16

4

The id of the (request-level) error, if any occurred.

ConsultRequest

Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.

ConsultRequest
Code Block

{
  size: int32                // the size of this request, not including the first 4 bytes for this field
  request_type_id: int16     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
                             // do not need any data for the consult request
}
ConsultResponse
Code Block

{
  size: int32                                             // the size of this response, not including the first 4 bytes for this field
  error_code: int16            
Code Block

{
  size: int32                // the size of this request
  request_type_id: int16     // the request id
  version_id: int16          // the version of this request
  correlation_id: int32      // an id set by the client that will be returned untouched in the response
  client_id: string          // an optional non-machine-specific identifier for this client
  required_acks: int8        // theglobal numbererror ofcode acknowledgementsfor requiredthis fromrequest, theif brokers before a response can be made
  ack_timeout: int32any
  coordinator_id: int16             // the time in ms to wait for acknowledgement from replicas
             // broker id of the coordinator
  [<broker_struct>]         // no need data for the consult request
}
ConsultResponse
Code Block

{
  size: int32                    // current broker list
}

broker_struct =>
{
  creator_id: string
  id: int32
  host: string
  port: int32
}

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterRequest to the coordinator.

After that, the consumer will keep trying to read new request from the coordinator. Hence the consumer does not need to maintain a socket server but just a SocketChannel.

The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first heartbeat within heartbeat_interval_ms.

RegisterRequest
Code Block

{
  size: int32      // the size of this request
  correlation_id: int16                              // the request id
  version_id: int16                                  // the versionsize of this request
   errorrequest_type_id: int16             // the type of the request, currently only one type is allowed: ConnectRequest
              // an id set by the client that will be returned untouched in the response
  data: {coordinatorId: int16, [<broker_struct>]}group_id: string                   // group that the actualconsumer databelongs requestedto
 (in the same format as defined for the produce request)
}

broker_struct =>
{
  creatorId:
  id:
  host:
  port:
}
Coordinator Request Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

request_type_id

int16

2

An id for the API being called (e.g., PingRequest, StarRequest and StopRequest, etc).

version_id

int16

3

A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.

client_id

string

4

An user-defined identifier for the client which is used for logging and statistics purposes.

Coordinator Response Format :

...

field

...

type

...

order

...

description

...

size

...

int32

...

1

...

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

...

request_type_id

...

int16

...

2

...

An id for the API being called (e.g., PingRequest, StarRequest and StopRequest, etc).

...

version_id

...

int16

...

3

...

A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.

...

client_id

...

string

...

4

...

 consumer_id: string                // consumer id
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  heartbeat_interval_ms: int32       // heartbeat interval in milliseconds, and the consumer is expected to response within the interval also
  max_heartbeat_retries: int16       // maximum number of allowed heartbeat timeouts before the consumer to be treated as faile
PingRequest
Code Block
{

}
PintResponse

...

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the heartbeating thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process.