Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  size: int32                                             // the size of this response, not including the first 4 bytes for this field
  error_code: int16                                       // global error code for this request, if any
  coordinator_id: int16                                   // broker id of the coordinator
  brokers_info: [<broker_struct>]                                       // current broker list
}

broker_struct =>
{
  creator_id: string
  id: int32
  host: string
  port: int32
}

...

The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first heartbeat within heartbeatstop or the ping request within ping_interval_ms.

RegisterRequest
Code Block
{
  size: int32                        // the size of this request
  request_type_id: int16             // the type of the request, currently only one type is allowed: ConnectRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  heartbeatping_interval_ms: int32            // heartbeatping interval in milliseconds, and the consumer is expected to response within the interval also
  max_heartbeatping_retries: int16            // maximum number of allowed heartbeatping timeouts before the consumer to be treated as faile
PingRequest
Code Block

{

}
PintResponse
Code Block

{

}
StopRequest
Code Block

{

}
StopResponse
Code Block

{

}
StartRequest
Code Block

{

}
StartResponse
failed
}

There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms

PingRequest
Code Block

{
  size: int32                     // the size of this request
  request_type_id: int16          // the type of the request, distinguish Ping/Stop/Start requests
  num_retries: int16              // indicate this is the #th ping retry
}
PintResponse
Code Block

{
  size: int32                     // the size of this response
  num_partition: int16            // number of partitions that the consumer is consuming
  offset_info: [<offset_struct>]  // the offset info for each partition consumed by this consumer, its size should be exactly num_partition
                                  // if auto_commit is set to false this field should not exist
}

offset_struct =>
{
  topic: string
  partition: string
  offset: int64
}
StopRequest
Code Block

{
  size: int32                     // the size of this request  request_type_id: int16          // the type of the request, distinguish Ping/Stop/Start requests}
StopResponse
Code Block

{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
}
StartRequest
Code Block

{
  size: int32                              // the size of this request  request_type_id: int16                   // the type of the request, distinguish Ping/Stop/Start requests
  assigned_parts: [<partition_struct>]     // the assigned ownership of the partitions, along with the starting offset
}

partition_struct =>
{
  topic: string
  partition: string                        // note that this partition string already contains the broker id
  offset: int64
}
StartResponse
Code Block

{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
Code Block

{

}

Open Problems

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the heartbeating ping thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process