Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Watch for new topics and topic partition changes in Zookeeper
  2. Accept and maintain a socket channel request from newly added consumers
  3. Watch for consumer failure by periodic heartbeating pinging them via the socket channels
  4. Rebalance for affected groups in response to topic partition (through ZK watchers) and group consumer (through heartbeatingping request) changes
  5. Communicate the rebalance results to consumers through the socket channels

...

The consumer, upon startup, will consult known brokers for the current coordinator. The known broker list is put in the consumer properties file. Then the consumer will try to create a socket channel to the coordinator, and once accepted, it will keep trying to read new requests from the coordinator and respond, but never proactively pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost and try to reconnect to the possibly new coordinator by restarting the consulting process again.

Paths Stored in

...

ZooKeeper

Most of the original ZK paths storage are kept, in addition to the coordinator path (stores the current coordinator info):

Code Block
/consumers/coordinator --> brokerIdbroker_id (ephemeral; created by coordinator)

Besides, some of the original ZK paths are removed, including:

Code Block

/consumers/groups/ids

And some of the ephemeral ZK paths are changed to persistent:

Coordinator:

Code Block

/consumers/groups/owners

On Coordinator Startup

Every server will create an coordinator instance as its member upon startup. The consumer coordinator keeps the following fields :(all the request usage and formats will be introduced later in this page):

Code Block

coordinatorElector 
Code Block

coordinatorElector : ZkElection                     // A ZK based elector using the coordinator path mentioned above

groupsBeingRebalanced : Map[String, AtomicBoolean]  // For each group, a bit indicating if the group is under rebalancing

consumerGroupsPerTopic : Map[String, Set[String]]   // For each topic, the consumer groups that are interested in the topic

groupsWithWildcardTopics : Set[String]              // Groups that has wildcard interests for topics

rebalanceRequestQ : LinkedBlockingQueueList[BlockingQueue[String]]     // A blocking queue storing all the rebalance requests, the request just contain the group name

requestHandler : List[RebalanceRequestHandler]            // A list of threadthreads handling all the rebalance requests read from the rebalanceReques
A. On Coordinator Startup

Every server will create an coordinator instance as its member, whose construction function will only initialize the coordinatorElector by passing a callback function called coordinatorStartup.

The coordinatorElector, upon initialization, will immediately try to become the leader. If someone else has become the leader, it will listen to the coordinator path for data change, and try to re-elect whenever the current elector resigns (i.e. the data on the path is deleted).

Whenever it elects to become the leader, it will trigger the callback function that is provided by its caller, i.e. the coordinator.

Code Block

coordinatorStartup :

1. Read all the topics from ZK and initialize consumerGroupsPerTopic

2. Read all the consumer groups from ZK

2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list

2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics

2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ

3. Register listeners for topics and their partition changes

3.1 Subscribe TopicChangeListener to /brokers/topics

3.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

4. Register listeners for consumer groups and their member changes

4.1 Subscribe registerGroupChangeListener to /consumers/groups/

4.2 Subscribe registerGroupMemeberChangeListener to each /consumers/groups/[groups]/ids

5. Register session expiration listener

6. Initialize and start the requestHandler thread
B. On Coordinator Change/Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the callback function coordinatorStartup.

When the dead server comes back, the zkClient will atomically reconnect to it and trigger the handleNewSession function.

Code Block

handleNewSession :

1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc

2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)

3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.
C. On ZK Watcher Fires

Handle group change

Code Block

GroupChangeListener.handleChildChange :

1. Get the newly added group (since /consumers/groups are persistent nodes, no groups should be deleted even if there is no consumers any more inside the group)

2. Subscribe the registerGroupMemeberChangeListener on /consumers/groups/group

3. Read all the topics this group is interested in, for each topic:

3.1 If the topic already exists in consumerGroupsPerTopic, update its list by adding this group

3.2 If the topic is not in consumerGroupsPerTopic yet, add the entry (topic -> Set(group))

4. If some of this group's consumers has wildcard interests, add that to groupsWithWildcardTopics

5. If the group already has some interested existed topics, put (group -> new AtomicBoolean(true)) to groupsUnderRebalance, and put the group to rebalanceRequestQ;

   Otherwise just put (group -> new AtomicBoolean(false)) to groupsUnderRebalance

Handle group member change

Code Block

GroupMemberChangeListener.handleChildChange :

1. If some topics are no longer interested due to the deletion of some consumer, update consumerGroupsPerTopic by removing the group from these topics' list

2. If the group no longer contain any consumer, do nothing;

   Otherwise if groupsBeingRebalanced(group).compareAndSet(false, true) succeeds, put the group to rebalanceRequestQ.

Handle topic change

Code Block

TopicChangeListener.handleChildChange :

1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)

2. For each newly added topic:

2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic

2.2 Get the set of groups that are interested in this topic from both consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group*

* By trying to request rebalance, we do the following:

if (groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ.put(group)

Handle topic partition change

rebalanceRequest

socketServer : SocketServer                         // A socket server for maintaining socket channels with all consumers and write Stop/StartRequest and read Ping/StartRequest
                                                    // It contains an acceptor and a list of processors, and for each processor it contain a request queue

pingScheduler : KafkaDelayedScheduler               // A scheduled thread which maintains and processes a timestamp based priority queue of ping request

The coordinator's construction function will only initialize the coordinatorElector by passing a callback function called coordinatorStartup.

The coordinatorElector, upon initialization, will immediately try to become the leader. If someone else has become the leader, it will listen to the coordinator path for data change, and try to re-elect whenever the current elector resigns (i.e. the data on the path is deleted).

Whenever it elects to become the leader, it will trigger the callback function that is provided by its caller, i.e. the coordinator.

Code Block

coordinatorStartup :

1. Read all the topics from ZK and initialize consumerGroupsPerTopic

2. Read all the consumer groups from ZK

2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list

2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics

2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ

3. Register listeners for topics and their partition changes

3.1 Subscribe TopicChangeListener to /brokers/topics

3.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

4. Start up socketChannel and pingScheduler

5. Register session expiration listener

6. Initialize and start the requestHandler threads

When a processor in the socketServer sends a StopRequest to a consumer's channel to inform it to stop consuming and wait for the new assigned partitions due to rebalance, it needs to initializes a expire watcher for the request, and when it received the corresponding StartResponse from the channel, it needs to clear the watcher. For a detailed description of the request expire/satisfy purgatory, please read here.

The pingScheduler is used to send PingRequests to all consumers checking their liveness based on the consumers' ping_interval_ms.

Code Block

KafkaDelayedScheduler.run() :

While isRunning

  1. Peek the head consumer from the priority queue

  2. If the consumer.scheduledTime >= current_time() try to send the PingRequest

  2.1 If the consumer's channel is not held by the socketServer's processor for rebalance, sends the PingRequest and set the timeout watcher for the consumer

  2.2 Otherwise do nothing

  3. Remove the consumer from the head of the queue and put the consumer with consumer.scheduledTime += consumer.ping_interval_ms back to the queue

For a detailed design of the scheduled thread, please read here.

On ZK Watcher Fires For Topic Partition Changes

When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalance.

Handle Topic Change
Code Block

TopicChangeListener.handleChildChange :

1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)

2. For each newly added topic:

2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic

2.2 
Code Block

TopicPartitionChangeListener.handleChildChange :

Get the set of groups that are interested in this topic from both consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group
On Rebalance Handling

The requestHandler thread keep block-reading from rebalanceRequestQ, and for each rebalance request for a specific group it calls the rebalance function.

If the rebalance succeeds it will reset groupsBeingRebalanced(group); otherwise it will retry rebalance again.

If the handler cannot finish rebalance successfully with config.maxRebalanceRetries retries, it will throw a ConsumerRebalanceFailedException.

*

* By trying to request rebalance, we do the following:

if (groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ[group % requestHandler.size].put(group)
Handle Topic Partition Change
Code Block

TopicPartitionChangeListener.handleChildChange :

Get the set of groups
Code Block

rebalance (group) :

1. Get the topics that are interested byin the group.

2. Compute the new ownership assignment after reading from ZK the number of partitions and number of threads for each topic.

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the new one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following

4.1 For each consumer in the group, send the "StopRequest" (details of communication is introduced later)

4.2 Then for each consumer in the group, send the "StartRequest" with part of the newly computed ownership specific to the consumer

4.3 Then wait until all the consumer has finished starting the fetcher (details of waiting is introduced later)

5. If waiting has timed out, return false; otherwise return true.

On Consumer Startup

Upon creation, the consumer will get a list of

Code Block

  {brokerId : (host, port)}

It can then consult to any one of the known brokers to get the full list of current brokers and the ID of the coordinator.

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterRequest to the coordinator.

After that, the consumer will keep trying to read new request from the coordinator. Hence the consumer does not need to maintain a socket server but just a SocketChannel.

Code Block

consumerStartup (initBrokers : Map[Int, (String, String)]):

1. Randomly pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be created, try another broker in the initBroker

1.2. If all the brokers in initBroker cannot be connected, throw a AllKnownBrokersNotAvailable exception and return

2. Send a ConsultRequest request to the broker and get a ConsultResponse from the broker

3. From the ConsultResponse update serverCluster and curCoordinator

4. Set up a socket channel with the current coordinator and send a RegisterRequest

5. Keep block-reading from the channel
ConsultRequest

The ConsultRequest sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

...

this topic from consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group

On Consumer Startup

Upon creation, the consumer will get a list of

Code Block

  {brokerId : (host, port)}

It can then consult to any one of the known brokers to get the full list of current brokers and the ID of the coordinator.

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterRequest to the coordinator.

After that, the consumer will keep trying to read new request from the coordinator. Hence the consumer does not need to maintain a socket server but just a SocketChannel.

Code Block

consumerStartup (initBrokers : Map[Int, (String, String)]):

1. Randomly pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be created, try another broker in the initBroker

1.2. If all the brokers in initBroker cannot be connected, throw a AllKnownBrokersNotAvailable exception and return

2. Send a ConsultRequest request to the broker and get a ConsultResponse from the broker

3. From the ConsultResponse update serverCluster and curCoordinator

4. Set up a socket channel with the current coordinator and send a RegisterRequest

5. Keep block-reading from the channel
ConsultRequest

The ConsultRequest sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.

Code Block

{
  size: int32                // the size of this request, not including the first 4 bytes for this field
  request_type_id: int16     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
                             // do not need any data for the consult request
}
ConsultResponse
Code Block

{
  size: int32                             
Code Block

{
  size: int32                // the size of this requestresponse, not including the first 4 bytes for this field
  requesterror_type_idcode: int16     // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
                             // doglobal noterror needcode anyfor datathis forrequest, the consult request
}
ConsultResponse
Code Block

{
  size: int32if any
  coordinator_id: int16                                   // broker id of the coordinator
  brokers_info: [<broker_struct>]   // the size of this response, not including the first 4 bytes for this field
  error_code: int16     // current broker list
}

broker_struct =>
{
  creator_id: string
  id: int32
  host: string
                     //port: int32
}
RegisterRequest

The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first Stop or the Ping request within ping_interval_ms.

Code Block

{
  size: int32 global error code for this request, if any
  coordinator_id: int16                        // the size of this request
  request_type_id: int16             // broker id of the coordinator
  brokers_info: [<broker_struct>]                         // current broker list
}

broker_struct =>
{
  creator_id: string
  id: int32
  host: string
  port: int32
}
RegisterRequest

...

 the type of the request, currently only one type is allowed: ConnectRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  topic_count: string                // interested topic and number of streams, can be wildcard
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  ping_interval_ms: int32            // ping interval in milliseconds, and the consumer is expected to response within the interval also
  max_ping_retries: int16            // maximum number of allowed ping timeouts before the consumer to be treated as failed
}

On Consumer Add/Delete

When a new connection is received from a consumer by the socketServer of the coordinator, it will wait for its RegisterRequest. Once the request is received, the coordinator will issue a rebalance request for the group of the newly added consumer. When a new group id is seen by the coordinator (hence this group does not exist yet), it needs to handle the newly added group.

When the consumer is failed, eventually the coordinator will know its failure through timeout of PingRequest. Then it will issue a rebalance request for the group of the failed consumer.

Handle New Group
Code Block

handleNewGroup :

1. Read all the topics this group is interested from topic_count, for each topic:

1.1. If the topic already exists in consumerGroupsPerTopic, update its list by adding this group

1.2. If the topic is not in consumerGroupsPerTopic yet, add the entry (topic -> Set(group))

2. If the interested topics are in the wildcard form, add this group to groupsWithWildcardTopics

3.1. If the group already has some interested existed topics, put (group -> new AtomicBoolean(true)) to groupsUnderRebalance, and put the group to rebalanceRequestQ[group % requestHandler.size]

3.2. Otherwise just put (group -> new AtomicBoolean(false)) to groupsUnderRebalance
Handle Group Member Change

Note that here we assume that all the consumers in the same group have the same set of interested groups, so adding/deleting consumers from the group will not change the group's interests. We think it is a fair assumption: though in current implementation consumers within a group can have different interested group, users seldomly do that.

Code Block

handleGroupMemberChange :

1. Decide the deleted consumer or the newly added consumer

2.1. If the group no longer contain any consumer, do nothing

2.2. Otherwise if groupsBeingRebalanced(group).compareAndSet(false, true) succeeds, put the group to rebalanceRequestQ[group % requestHandler.size].

On Rebalancing

The requestHandler threads keep block-reading from their corresponding rebalanceRequestQ, each will handle the rebalancing task for a non-overlapping subset of groups (e.g. through mod hashing).

For each rebalance request for a specific group it calls the rebalance function.

If the rebalance succeeds it will reset groupsBeingRebalanced(group); otherwise it will retry rebalance again.

If the handler cannot finish rebalance successfully with config.maxRebalanceRetries retries, it will throw a ConsumerRebalanceFailedException.

Code Block

rebalance (group) :

1. Get the topics that are interested by the group. For each topic:

1.1. Get the number of partitions by reading from ZK

1.2. Get the number of threads for each topic from the alive consumers connecting to it

1.3. Compute the new ownership assignment for the topic

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the new one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following:

4.1 For each consumer in the group, send the StopRequest to the socketServer's corresponding processor's queue

4.2 Then for each consumer in the group, send the StartRequest with he newly computed ownership specific to the consumer to the socketServer's corresponding processor's queue

4.3 Then wait until socketServer has reported that all the StartReponse have been received

5. If waiting has timed out, return false; otherwise write the new ownership info to ZK and return true.

On Coordinator Failover

Whenever the current coordinator's hosted server dies, other coordinator's elector will realize that through the ZK listener and will try to re-elect to be the leader, and whoever wins will trigger the callback function coordinatorStartup.

When the dead server comes back, the zkClient will atomically reconnect to it and trigger the handleNewSession function.

Code Block

handleNewSession :

1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc

2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)

3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.

On Consumer

Code Block

{
  size: int32                        // the size of this request
  request_type_id: int16             // the type of the request, currently only one type is allowed: ConnectRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  ping_interval_ms: int32            // ping interval in milliseconds, and the consumer is expected to response within the interval also
  max_ping_retries: int16            // maximum number of allowed ping timeouts before the consumer to be treated as failed
}

...

Handling Coordinator Request

There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms

...

Code Block
{
  size: int32                     // the size of this response
  num_partition: int16            // number of partitions that the consumer is consuming
  offset_info: [<offset_struct>]  // the offset info for each partition consumed by this consumer, its size should be exactly num_partition
                                  // if auto_commit is set to false the last two fields should not exist
}

offset_struct =>
{
  topic: string
  partition: string
  offset: int64
}
Handling PingRequest
Code Block

handlePingRequest (request: PingRequest):

1.1. If auto_commit is not enabled, just sends a PingResponse with no num_partition and offset_info

1.2. Otherwise read topicEntry and sends a PingResponse with the created num_partitions and offset_info
StopRequest

When the coordinator decides to rebalance a group,it will first send StopRequest to every consumer in the group to let them stop consuming.

Code Block
    // if auto_commit is set to false the last two fields should not exist
}

offset_struct =>
{
  sizetopic: int32string
  partition: string
  offset: int64
}
Handling PingRequest
Code Block

handlePingRequest (request: PingRequest):

1.1. If auto_commit is not enabled, just sends a PingResponse    // the size of this request
  request_type_id: int16          // the type of the request, distinguish Ping/Stop/Start requests
}

...

with no num_partition and offset_info

1.2. Otherwise read topicRegistry and sends a PingResponse with the created num_partitions and offset_info
StopRequest

When the coordinator decides to rebalance a group,it will first send StopRequest to every consumer in the group to let them stop consuming.

Code Block
{
  size: int32                     // the size of this responserequest
  errorrequest_type_codeid: int16               // globalthe errortype codeof for thisthe request, if anydistinguish Ping/Stop/Start requests
}
Handling StopRequest

The consumer does not need to response to the StartRequest, since the coordinator only needs to synchronize with consumers after StartRequest are sent.

Code Block
handlePingRequesthandleStopRequest (request: PingRequestStopRequest):

1.1. If fetcherStopped is true then do nothing

1.2. Otherwise close all fetchers and clean their corresponding queues, set fetcherStopped to true

2. Send the StopResponse to the coordinator corresponding queues, set fetcherStopped to true
StartRequest

The coordinator will send every consumer in a group StartRequest with the owned partition info after the StopRequest.

...

Code Block
{
  size: int32                     // the size of this response
  error_code: int16               // global error code for this request, if any
}
Handling StartRequest

On Consumer Failover

When the consumer is failed, eventually the coordinator will know its failure through timeout of PingRequest. Then it will issue a rebalance request for the group of the failed consumer.

When the consumer came back, it will rerun the consumerStartup process and reconnect to the coordinator. Once a new socket channel is accepted and the RegisterRequest is received, the coordinator will issue a rebalance request for the group of the newly added consumer.

...

Code Block

handleStartRequest (request: StartRequest):

1. If the consumer's topic interests are wildcard, re-construct topicThreadIdAndQueues and KafkaMessageAndMetadataStreams

2. Read the assigned partitions along with offsets from assigned_parts

3. If fetcherStopped is false and the newly assigned partition map does not match topicRegistry, throw IllegalStateException

4. Update topicRegistry with the newly assigned partition map

5. Start fetchers

6. Set fetcherStopped to false

Open Problems

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the ping socketServer's processor thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process