Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
rebalance (group) :

1. Get the topics that are interested by the group.

2. Compute the new ownership assignment after reading from ZK the number of partitions and number of threads for each topic.

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the new one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following

4.1 For each consumer in the group, send the "StopRequest" (details of communication is introduced later)

4.2 Then for each consumer in the group, send the "StartRequest" with part of the newly computed ownership specific to the consumer

4.3 Then wait until all the consumer has finished starting the fetcher (details of waiting is introduced later)

5. If waiting has timed out, return false; otherwise return true.

On Consumer

...

Startup

Upon creation, the consumer will get a list of

Code Block
  {brokerId : (host, port)}

It can then consult to any one of the known brokers to get the full map of broker : address list of current brokers and the ID of the coordinator.

On Consumer Startup (and Connection to Coordinator)

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterRequest to the coordinator.

After that, the consumer will keep trying to read new request from the coordinator. Hence the consumer does not need to maintain a socket server but just a SocketChannel.

Code Block

consumerStartup (initBrokers : Map[Int,
Code Block

consumerStartup (initBrokers : Map[Int, (String, String)]):

1. Randomly pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be created, try another broker in the initBroker

1.2. If all the brokers in initBroker cannot be connected, throw a AllKnownBrokersNotAvailable exception and return

2. Send a consult-requestConsultRequest request to the broker and get a consult-responseConsultResponse from the broker.

3. From the consult-responseConsultResponse update serverCluster and curCoordinator.

4. Set up a socket channel with the current coordinator.

5.  and send a RegisterRequest

5. Keep block-reading from the channel.
On Consumer Failover

When the consumer is failed, eventually the coordinator will realize this through failing to get response of heartbeating request. Then it will issue a rebalance request for the group of the failed consumer.

When the consumer came back, it will re-try the consulting process and then reconnect to the coordinator. Once a new socket channel is accepted, the coordinator will issue a rebalance request for the group of the newly added consumer.

Details of the coordinator rebalance logic is introduced in the coordinator section.

Coordinator-Consumer Communication

The request sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.

ConsultRequest
ConsultRequest

The ConsultRequest sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.

Code Block

{
  size: int32                // the size of this request, not including the first 4 bytes for this field
  request_type_id: int16     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
    
Code Block

{
  size: int32                // the size of    this request, not// includingdo thenot firstneed 4any bytesdata for this fieldthe consult request
}
ConsultResponse
Code Block

{
  request_type_idsize: int16int32     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
                             // do not need any data for the consult request
}
ConsultResponse
Code Block

{
  size: int32                                             / // the size of this response, not including the first 4 bytes for this field
  error_code: int16                                       // global error code for this request, if any
  coordinator_id: int16                                   // broker id of the coordinator
  brokers_info: [<broker_struct>]                         // current broker list
}

broker_struct =>
{
  creator_id: string
  id: int32
  host: string
  port: int32
}

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterRequest to the coordinator.

...

RegisterRequest

The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first stop Stop or the ping Ping request within ping_interval_ms.

RegisterRequest


Code Block
{
  size: int32                        // the size of this request
  request_type_id: int16             // the type of the request, currently only one type is allowed: ConnectRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  ping_interval_ms: int32            // ping interval in milliseconds, and the consumer is expected to response within the interval also
  max_ping_retries: int16            // maximum number of allowed ping timeouts before the consumer to be treated as failed
}

On Handling Coordinator Request

There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms

PingRequest

Coordinator use PingRequest to check if the consumer is still alive, and consumer need to response with the current offset for each consuming partition if auto_commit is set.

Code Block
{
  size: int32                     // the size of this request
  request_type_id: int16          // the type of the request, distinguish Ping/Stop/Start requests
  num_retries: int16              // indicate this is the #th ping retry
}

...

PingResponse
Code Block
{
  size: int32                     // the size of this response
  num_partition: int16            // number of partitions that the consumer is consuming
  offset_info: [<offset_struct>]  // the offset info for each partition consumed by this consumer, its size should be exactly num_partition
                                  // if auto_commit is set to false the thislast two fieldfields should not exist
}

offset_struct =>
{
  topic: string
  partition: string
  offset: int64
}

...

Handling PingRequest
Code Block
{
handlePingRequest (request: size: int32                     // the size of this request  request_type_id: int16          // the type of the request, distinguish Ping/Stop/Start requests}
StopResponse
PingRequest):

1.1. If auto_commit is not enabled, just sends a PingResponse with no num_partition and offset_info

1.2. Otherwise read topicEntry and sends a PingResponse with the created num_partitions and offset_info
StopRequest

When the coordinator decides to rebalance a group,it will first send StopRequest to every consumer in the group to let them stop consuming.

Code Block
{
  size: int32                     // the size of this responserequest
  errorrequest_type_codeid: int16          // the type of the request, distinguish Ping/Stop/ global error code for this request, if anyStart requests
}

...

StopResponse
Code Block
{
  size: int32                              // the size of this requestresponse
  requesterror_type_idcode: int16                   // theglobal error typecode offor thethis request, if any
}
Handling StopRequest
Code Block

handlePingRequest (request: PingRequest):

1.1. If fetcherStopped is true then do nothing

1.2. Otherwise close all fetchers and clean their corresponding queues, set fetcherStopped to true

2. Send the StopResponse to the coordinator
StartRequest

The coordinator will send every consumer in a group StartRequest with the owned partition info after the StopRequest.

Code Block

{
  size: int32 distinguish Ping/Stop/Start requests
  assigned_parts: [<partition_struct>]     // the assigned ownership of the partitions, along with the starting offset
}

partition_struct =>
{
  topic: string
  partition: string                        // note that this partition string// alreadythe containssize theof brokerthis idrequest
  offset: int64
}
StartResponse
Code Block

{
  size: int32request_type_id: int16                   // the type of  // the sizerequest, of this responsedistinguish Ping/Stop/Start requests
  errornum_codepartitions: int16                    // globalthe errornumber codeof forpartitions thisassigned request,to ifthe any}
consumer
  assigned_parts: [<partition_struct>]     // the detailed info of assigned partitions, along with the starting offset
}

partition_struct =>
{
  topic: string
  partition: string                        // note that this partition string already contains the broker id
  offset: int64
}
StartResponse
Code Block

{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
}
Handling StartRequest

On Consumer Failover

When the consumer is failed, eventually the coordinator will know its failure through timeout of PingRequest. Then it will issue a rebalance request for the group of the failed consumer.

When the consumer came back, it will rerun the consumerStartup process and reconnect to the coordinator. Once a new socket channel is accepted and the RegisterRequest is received, the coordinator will issue a rebalance request for the group of the newly added consumer.

Details of the coordinator rebalance logic is introduced in the coordinator section.

Open Problems

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the ping thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process