Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

                             (record metadata)       -- (confirm) --> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       -- (confirm) --> Consumer (same as above)

...

                                     -- (ping) -->     Consumer (failed)

----------------------------------------------------------------------------------------------

...

Initiate rebalance operation and communicate new partition ownership to consumers. When a coordinator initiates a rebalance operation for a consumer group, it will first send the stop-fetcher request to each consumer in the group and wait for all of them to reply with the last partition consumed offsets. Upon receiving the reply, the consumer should have stopped its fetchers. Then it will send a start-fetcher request to each consumer in the group with the newly computed partition assignment along with the starting offset. The coordinator will finish the rebalance by waiting for all the consumers to finish restarting the fetchers and respond. If any reply is not received after the timeout value, the coordinator will retry the rebalance procedure by marking the consumer as failed and sending the stop-fetcher request to everybody elseoperation. If the rebalance cannot finish within a number of rebalance retries, the coordinator will log that as a FATAL error and stop rebalancing that consumer group.

...

                                     -- (stop-fetcher) -->     Consumer (alive)

----------------------------------------------------------------------------------------------

...

                                                                                       -- (start-fetcher) -->     Consumer (become failed)

...

(retry rebalance)  time out...        Consumer (failed)

When a coordinator is faileddies, other peer brokers will notice that and try to re-elect to be the a new coordinator. The new coordinator will not try to connect to the consumers but wait for the consumers to realize the previous coordinator has is dead and try to re- connect to itselfthe new one.

The consumer, upon startup, will consult known brokers query the brokers in the cluster for the current coordinator. The known broker list is put specified in the consumer properties file. Then the consumer will try to create establish a socket channel to the coordinator and register itself through the socket channel; once accepted, it will keep trying to read new requests from the coordinator and respond, but never pro-actively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost, and stop its fetchers and try to reconnect to the possibly new coordinator by restarting the consulting process again. Its workflow logic can be found here.

...

.

...

The main motivation behind having the consumer not send heartbeats to the co-ordinator is to keep the consumer design simple. Implementing blocking socket channels is significantly simpler than implementing a full-fledged socket server using non-blocking socket channels. This goes back to making it easier to rewrite the consumer in other languages.

3. Configuration

Config Options to Brokers/Consumers

The following config options are added to brokers, these options will only be used by the broker when

Config Options to Brokers/Consumers

The following config options are added to brokers, these options will only be used by the broker when it elects as the coordinator:

  1. Rebalancing threadpool sizeCommunication threadpool
  2. (in socket server) Socket server threadpool size
  3. Minimum viable value for consumer session timeout
  4. Maximum number of failed requests timed out heartbeats to consumers

The following config options are added to consumers

...

4. On Coordinator Startup

Every server will create an a coordinator instance object as its member upon startup. The consumer coordinator keeps the following fields (all the request formats and their usage will be introduced later in this page):

  1. A ZK based elector using the coordinator path mentioned above
  2. For each group, a bit indicating if the group is under rebalancing
  3. For each topic, the consumer groups that are interested in the topic
  4. For each consumer, its meta information config options such as session timeout value, topic count, offset commit option, etc
  5. A list of groups that has wildcard interests for topics
  6. A configurable fixed size list of blocking queue storing all the rebalance requests
  7. A threadpool with each thread handling all the rebalance requests read from one of the rebalance request queue
  8. A socket server for maintaining socket channels with all consumers. It contains a fixed number of processor threads for writing RestartFetcherRequest and read PingResponse and RestartFetcherResponse
  9. A scheduler of thread for sending PingRequest to all the consumers

...

Code Block
coordinatorStartup :

1. Read all the topics from ZK and initialize the interested groups for each topic and the list of groups withinterested in wildcard intereststopics.

2. Register listeners for topics and their partition changes

2.1 Subscribe TopicChangeListener to /brokers/topics

2.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

3. Start up the socket server and the scheduler for ping requests

4. Register session expiration listener

5. Initialize the threadpool of rebalancing handlers

...

When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalancerebalancing.

Handle Topic Change
Code Block
TopicChangeListener.handleChildChange :

1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)

2. For each newly added topic:

2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic

2.2 For groups that have wildcard interests, check whether any thegroup newlyis addedinterested topicin arethe ofnewly theiradded intereststopics; if yes, add the group to the interested group list of this topic

2.3 Get the set of groups that are interested in this topic,  and if the group's rebalance bit is not set, set it and try to rebalance the group

* By saying "try to rebalance the group" we mean create a new rebalance request and send it to the corresponding rebalance thread's queue. The correspondence mapping can be, for example, hashinghash based.
Handle Topic Partition Change

...

as part of the config info from its properties file. It can then consult to any one of the known brokers to get the full list of current host/port of all brokers and the ID of the coordinator.

Once the consumer finds out the address of the coordinator, it will try to connect to the coordinator. When the connection is set up, it will send the only one RegisterConsumerRequest to the coordinator.

After that, the consumer will try to keep trying to read reading new request requests from the coordinator. Hence the consumer does not need to maintain embed a socket server.

Code Block
consumerStartup (initBrokers : Map[Int, (String, String)]):

1. In a round robin fashion, pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be createdestablished, it will log an error and try the next broker in the initBroker list

1.2. IfThe allconsumer thewill brokerskeep in initBroker cannot be connected, log a AllKnownBrokersNotAvailable error event and go back retrying connection to the first broker

1.3. Thebrokers in a round robin fashioned retrying to connect to the brokers will go forever until the consumer it is shut down.

2. Send a ClusterMetadataRequest request to the broker and get a ClusterMetadataResponse from the broker

3. From the response update its local memory of the current server cluster info and the id of the current coordinator

4. Set up a socket channel with the current coordinator, send a RegisterConsumerRequest and receive a RegisterConsumerResponse

5. If the RegisterConsumerResponse verifiesindicates the registryconsumer registration is accepted successful, it will try to keep block-reading reading rebalancing requests from the channel; otherwise go back to step 1

...

The ClusterMetadataRequest will be sent by the consumer can be received by any broker, and is handled by the broker's socket server. Hence its request format should be compatible with the ProduceRequest and FetchRequest.to any of the brokers in the cluster. 

Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.

...

Code Block
{
  size: int32                                             // the size of this response, not including the first 4 bytes for this field
  error_code: int16                                       // global error code for this request, if any
  coordinator_id: int32                                   // broker id of the coordinator
  num_brokers: int16                                      // number of brokers in the following list
  brokers_info: [<broker_struct>]                         // current broker list in the cluster
}

broker_struct =>
{
  id: int32
  host: string                                            // we adapt the protocol of 0.8 for strings, where the size of the string is stored at the beginning, followed by the bytes
  port: int32
}
RegisterConsumerRequest

...

Code Block
{
  size: int32                        // the size of this request
  request_type_id: int16             // the type of the request, currently only one type is allowed: ConnectConsumerRequest
  group_id: string                   // group that the consumer belongs to
  consumer_id: string                // consumer id
  topic_count: string                // interested topic and number of streams, can be wildcard
  auto_commit: boolean               // indicator of whether autocommit is enabled
  auto_offset_rest: string           // indicator of what to do if an offset is out of range, currently either smallest or largest
  session_timeout_ms: int32          // session timeout in milliseconds
}

...

Upon receiving the RegisterConsumerRequest, the coordinator will check the following:

  1. Consumer's ID is new to other consumers unique within the same group
  2. Consumer's specified session_timeout is larger than its minimum allowed session timeout value

If all the checks are successful, then the co-ordinator accepts the registry by storing these meta information of registration by recording the consumer in main memory 's config data in its internal data structures and sends a RegisterConsumerReponse with no error message; otherwise encode the error message it includes an appropriate error code in the RegisterConsumerReponse and close closes the connection

Code Block
{
  size: int32                        // the size of this response
  error_code: int16                  // global error code for this request, if any
                                     // no data needed
}

...

Code Block
handleNewGroup :

1. Read all the topics this group is interested from topic_count,in, and for each topic:

1.1. If the topic already exists in coordinator's memory, update its list by adding this group

1.2. Otherwise createadd this topic with the thisnew group as its only interested group

2. If the interested topics are in the wildcard formformat, add this group to the list of wildcard-interested groups

3.1. If the group already has some interested existed topics, create its rebalance bit initialized as true, and try to rebalance the group

3.2. Otherwise just create its rebalance bit initialized as false

...

  1. The scheduler will try to send the PingRequest to the consumer every 1/3 * consumer.session_timeout, and set the timeout watcher waiting for 2/3 * consumer.session_timeout.
  2. Similarly, when the processor threads sends the RestartRequest stop/start fetcher requests to the consumer it also expect to receive a response within 2/3 * consumer.session_timeout.
  3. If the watcher for PingRequest expires, the coordinator marks that ping request as failed.
  4. If the PingRequest has failed more than the maximum number of ping retries, the coordinator will marks the consumers consumer as failed.
    1. It removes the consumer's registry information from its memory and close the connection.
    2. It issues a rebalance request for the group of the failed consumer.
  5. Even if the consumer is going through a soft failure (GC), it will disconnect from the co-ordinator within 1/3*session_timeout and stop its fetchers until it has re-registered with the current co-ordinator.
Handle Consumer Failure
Code Block
handleConsumerFailure :

1. Decide if the group still has some consumers; if not, do nothing.

2. If the rebalance bit is not set, set it and try to rebalance the group.

...