Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The

...

following

...

is

...

a

...

draft

...

design

...

that

...

uses

...

a

...

high-available

...

consumer

...

coordinator

...

at

...

the

...

broker

...

side

...

to

...

handle

...

consumer

...

rebalance.

...

By

...

migrating

...

the

...

rebalance

...

logic

...

from

...

the

...

consumer

...

to

...

the

...

coordinator

...

we

...

can

...

resolve

...

the

...

consumer

...

split

...

brain

...

problem

...

and

...

help

...

thinner

...

the

...

consumer

...

client.

...

Overview:

...

One

...

of

...

the

...

brokers

...

is

...

elected

...

as

...

the

...

coordinator

...

for

...

all

...

the

...

consumer

...

groups.

...

It

...

will

...

be

...

responsible

...

for:

...

  1. Watch

...

  1. for

...

  1. new

...

  1. topics

...

  1. and

...

  1. topic

...

  1. partition

...

  1. changes in Zookeeper
  2. Accept and maintain a socket channel request from newly added consumers
  3. Watch for consumer failure by periodic heartbeating them via the socket channels
  4. Rebalance for affected groups in response to topic partition (through ZK watchers) and group consumer (through heartbeating) changes
  5. Communicate the rebalance results to consumers through the socket channels

When a coordinator decides a rebalance is needed for certain group, it will first sends the stop-fetcher command to each consumers in the group, and then sends the start-fetcher command to each consumers with the assigned partitions. Each consumers will only receive the partition info of the partitions that are assigned to itself. The coordinator will finish the rebalance by waiting for all the consumers to finish starting the fetchers and respond.

The consumer, upon startup, will consult known brokers for the current coordinator. The known broker list is put in the consumer properties file. Then the consumer will try to create a socket channel to the coordinator, and once accepted, it will keep trying to read new requests from the coordinator and respond, but never proactively send requests to the coordinator. When the consumer does not receive any request within a configurable amount of time, it will treat it as the connection has lost and try to reconnect to the possibly new coordinator by restarting the consulting process again.

Paths Stored in ZK:

Most of the original ZK paths storage are kept, in addition to the coordinator path (stores the current coordinator info):

Code Block

/consumers/coordinator --> brokerId (ephemeral; created by coordinator)

Besides, some of the original ZK paths are removed, including:

Coordinator:

The consumer coordinator keeps the following fields:

Code Block

# Rebalance logic for affected groups in response to watched change events
# Communicate the rebalance results to consumers

When a coordinator decides a rebalance is needed for certain group, it will first sends the stop-fetcher command to each consumers in the group (this communication channel is currently implemented using a ZK based queue), and then sends the start-fetcher command to each consumers with the assigned partitions. Each consumers will only receive the partition info of the partitions that are assigned to itself. The coordinator will finish the rebalance by waiting for all the consumers to finish starting the fetchers and respond (currently implemented as cleaning the queue).


h3. Paths Stored in ZK:

Most of the original ZK paths storage are kept, in addition to the following paths:

# *Coordinator path*: stores the current coordinator info.  

/consumers/coordinator \--> brokerId (ephemeral; created by coordinator)
# *ConsumerChannel path*: stores the coordinator commands to the consumer.  {color:#000000}  {color}

{color:#000000}/consumers/groups/{color}{color:#000000}group{color}{color:#000000}/channel/{color}{color:#000000}consumer{color}{color:#000000}/{color}{color:#000000}commandId{color} {color:#000000}\--> command (sequential persistent; cre{color}ated by coordinator, removed by consumer)
\\

h3. Coordinator:

The consumer coordinator keeps the following fields:

{code}
coordinatorElector : ZkElection                     // A ZK based elector using the coordinator path mentioned above

groupsBeingRebalanced : Map[String, AtomicBoolean]  // For each group, a bit indicating if the group is under rebalancing

consumerGroupsPerTopic : Map[String, Set[String]]   // For each topic, the consumer groups that are interested in the topic

groupsWithWildcardTopics : Set[String]              // Groups that has wildcard interests for topics

rebalanceRequestQ : LinkedBlockingQueue[String]     // A blocking queue storing all the rebalance requests, the request just contain the group name

requestHandler : RebalanceRequestHandler            // A thread handling all the rebalance requests read from the rebalanceReques
{code}



h5. 
A.

...

On

...

Coordinator

...

Startup

...

Every

...

server

...

will

...

create

...

an

...

coordinator

...

instance

...

as

...

its

...

member,

...

whose

...

construction

...

function

...

will

...

only

...

initialize

...

the

...

coordinatorElector

...

by

...

passing

...

a

...

callback

...

function

...

called

...

coordinatorStartup

...

.

...

The

...

coordinatorElector

...

,

...

upon

...

initialization,

...

will

...

immediately

...

try

...

to

...

become

...

the

...

leader.

...

If

...

someone

...

else

...

has

...

become

...

the

...

leader,

...

it

...

will

...

listen

...

to

...

the

...

coordinator

...

path

...

for

...

data

...

change,

...

and

...

try

...

to

...

re-elect

...

whenever

...

the

...

current

...

elector

...

resigns

...

(i.e.

...

the

...

data

...

on

...

the

...

path

...

is

...

deleted).

...

Whenever

...

it

...

elects

...

to

...

become

...

the

...

leader,

...

it

...

will

...

trigger

...

the

...

callback

...

function

...

that

...

is

...

provided

...

by

...

its

...

caller,

...

i.e.

...

the

...

coordinator.

Code Block



{code}
coordinatorStartup :

1. Read all the topics from ZK and initialize consumerGroupsPerTopic

2. Read all the consumer groups from ZK

2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list

2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics

2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ

3. Register listeners for topics and their partition changes

3.1 Subscribe TopicChangeListener to /brokers/topics

3.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic]

4. Register listeners for consumer groups and their member changes

4.1 Subscribe registerGroupChangeListener to /consumers/groups/

4.2 Subscribe registerGroupMemeberChangeListener to each /consumers/groups/[groups]/ids

5. Register session expiration listener

6. Initialize and start the requestHandler thread
{code}



h5. 
B.

...

On

...

Coordinator

...

Change/Failover

...

Whenever

...

the

...

current

...

coordinator's

...

hosted

...

server

...

dies,

...

other

...

coordinator's

...

elector

...

will

...

realize

...

that

...

through

...

the

...

ZK

...

listener

...

and

...

will

...

try

...

to

...

re-elect

...

to

...

be

...

the

...

leader,

...

and

...

whoever

...

wins

...

will

...

trigger

...

the

...

callback

...

function

...

coordinatorStartup

...

.

...

When

...

the

...

dead

...

server

...

comes

...

back,

...

the

...

zkClient

...

will

...

atomically

...

reconnect

...

to

...

it

...

and

...

trigger

...

the

...

handleNewSession

...

function.

{
Code Block
}
handleNewSession :

1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc

2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)

3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.
{code}

h5. 
C.

...

On

...

ZK

...

Watcher

...

Fires

...

Handle

...

group

...

change

{
Code Block
}
GroupChangeListener.handleChildChange :

1. Get the newly added group (since /consumers/groups are persistent nodes, no groups should be deleted even if there is no consumers any more inside the group)

2. Subscribe the registerGroupMemeberChangeListener on /consumers/groups/group

3. Read all the topics this group is interested in, for each topic:

3.1 If the topic already exists in consumerGroupsPerTopic, update its list by adding this group

3.2 If the topic is not in consumerGroupsPerTopic yet, add the entry (topic -> Set(group))

4. If some of this group's consumers has wildcard interests, add that to groupsWithWildcardTopics

5. If the group already has some interested existed topics, put (group -> new AtomicBoolean(true)) to groupsUnderRebalance, and put the group to rebalanceRequestQ;

   Otherwise just put (group -> new AtomicBoolean(false)) to groupsUnderRebalance
{code}

Handle

...

group

...

member

...

change

{
Code Block
}
GroupMemberChangeListener.handleChildChange :

1. If some topics are no longer interested due to the deletion of some consumer, update consumerGroupsPerTopic by removing the group from these topics' list

2. If the group no longer contain any consumer, do nothing;

   Otherwise if groupsBeingRebalanced(group).compareAndSet(false, true) succeeds, put the group to rebalanceRequestQ.
{code}

Handle

...

topic

...

change

{
Code Block
}
TopicChangeListener.handleChildChange :

1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)

2. For each newly added topic:

2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic

2.2 Get the set of groups that are interested in this topic from both consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group*

* By trying to request rebalance, we do the following:

if (groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ.put(group)
{code}

Handle

...

topic

...

partition

...

change

Code Block



{code}
TopicPartitionChangeListener.handleChildChange :

Get the set of groups that are interested in this topic from consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group
{code}

h5. D. On Rebalance Handling

The requestHandler thread keep 
On Rebalance Handling

The requestHandler thread keep block-reading

...

from

...

rebalanceRequestQ,

...

and

...

for

...

each

...

rebalance

...

request

...

for

...

a

...

specific

...

group

...

it

...

calls

...

the

...

rebalance

...

function.

...

If

...

the

...

rebalance

...

succeeds

...

it

...

will

...

reset

...

groupsBeingRebalanced

...

(group);

...

otherwise

...

it

...

will

...

retry

...

rebalance

...

again.

...

If

...

the

...

handler

...

cannot

...

finish

...

rebalance

...

successfully

...

with

...

config

...

.

...

maxRebalanceRetries

...

retries,

...

it

...

will

...

throw

...

a

...

ConsumerRebalanceFailedException

...

.

{
Code Block
}
rebalance (group) :

1. Get the topics that are interested by the group.

2. Compute the new ownership assignment after reading from ZK the number of partitions and number of threads for each topic.

3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic.

3.1 If there is no registered ownership info in ZK, rebalance is necessary

3.2 If some partitions are not owned by any threads, rebalance is necessary

3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary

3.4 If ownership map do not match with the new one, rebalance is necessary

3.5 Otherwise rebalance is not necessary

4. If rebalance is necessary, do the following

4.1 For each consumer in the group, send the "stop-fetcherStopRequest" command (details of communication is introduced later)

4.2 Then for each consumer in the group, send the "start-fetcherStartRequest" command with part of the newly computed ownership specific to the consumer

4.3 Then wait until all the consumer has finished starting the fetcher (details of waiting is introduced later)

5. If waiting has timed out, return false; otherwise return true.
{code}

h3. 

Consumer:

...

Upon

...

creation,

...

the

...

consumer

...

will

...

get

...

a

...

list

...

of

Code Block

 {brokerId : inetAdrress}.(host, port)}

It can then consult to any one of the known brokers to get the full map of broker : address and the coordinator.

On Consumer Startup (and Connection to Coordinator)
Code Block
 It can then consult to any one of the known brokers to get the full map of broker : address and the coordinator.


h5. A. On Consumer Startup (and Connection to Coordinator)

{code}
consumerStartup (initBrokers : Map\[Int, (String, Broker\String)]):

1. Randomly pick a broker in the initBrokers, create a socket channel with that broker

1.1. If the socket channel cannot be created, try another broker in the initBroker

1.2. If all the brokers in initBroker cannot be connected, throw a AllKnownBrokersNotAvailable exception

2. Send a consult-request request to the broker and get a consult-response from the broker.

3. From the consult-response update serverCluster and curCoordinator.

4. Set up a socket channel with the current coordinator.

5. Keep block-reading from the channel.
{code}

B. On Consumer Failover

Coordinator-Consumer Communication


h5. A. Coordinator Commands Format:\\

CommandCommand {

   option                                  :   String                                                                                               // Can either be "start-fetcher" or "stop-fetcher"

  ownershipMap                  :  Map\[topicPartition: String  =>  consumerThread: String\]      // a map of owned partitions to consumer threads, only available for "start-fetcher"


}

B. Coordinator-Side Sending Commands to Consumer


C. Consumer-Side Handling of Coordinator Commands
On Consumer Failover

When the consumer is failed, eventually the coordinator will realize this through failing to get response of heartbeating request. Then it will issue a rebalance request for the group of the failed consumer.

When the consumer came back, it will re-try the consulting process and then reconnect to the coordinator. Once a new socket channel is accepted, the coordinator will issue a rebalance request for the group of the newly added consumer.

Details of the coordinator rebalance logic is introduced in the coordinator section.

Coordinator-Consumer Communication

The request sent by the consumer can be received by any broker, and is handled by the broker's ???SocketHandler. Hence its request format should be compatible with the ProduceRequest and FetchRequest.

The request sent by the coordinator is received by the consumer client, and is handled by the consumer's ???SocketHandler.

Consumer Request Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

request_type_id

int16

2

An id for the API being called (e.g., ProduceRequest, FetchRequest and ConsultRequest, etc).

version_id

int16

3

A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.

client_id

string

4

An user-defined identifier for the client which is used for logging and statistics purposes.

Consumer Response Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

correlation_id

int32

2

An id that can be set by the client and will be returned untouched by the server in the response.

version_id

int16

3

A version number for the response format.

error

int16

4

The id of the (request-level) error, if any occurred.

ConsultRequest
Code Block

{
  size: int32                // the size of this request
  request_type_id: int16     // the request id
  version_id: int16          // the version of this request
  correlation_id: int32      // an id set by the client that will be returned untouched in the response
  client_id: string          // an optional non-machine-specific identifier for this client
  required_acks: int8        // the number of acknowledgements required from the brokers before a response can be made
  ack_timeout: int32         // the time in ms to wait for acknowledgement from replicas
  data: null                 // no need data for the consult request
}
ConsultResponse
Code Block

{
  size: int32                                        // the size of this request
  correlation_id: int16                              // the request id
  version_id: int16                                  // the version of this request
  error: int16                                       // an id set by the client that will be returned untouched in the response
  data: {coordinatorId: int16, [<broker_struct>]}    // the actual data requested (in the same format as defined for the produce request)
}

broker_struct =>
{
  id:
  creatorId:
  host:
  port:
}
Coordinator Request Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

request_type_id

int16

2

An id for the API being called (e.g., PingRequest, StarRequest and StopRequest, etc).

version_id

int16

3

A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.

client_id

string

4

An user-defined identifier for the client which is used for logging and statistics purposes.

Coordinator Response Format :

field

type

order

description

size

int32

1

The size of this request (not counting this 4 byte size). This is mandatory and required by the network layer.

request_type_id

int16

2

An id for the API being called (e.g., PingRequest, StarRequest and StopRequest, etc).

version_id

int16

3

A version number for the request format. This number starts at 0 and increases every time a protocol change is made for this API.

client_id

string

4

An user-defined identifier for the client which is used for logging and statistics purposes.

PingRequest
Code Block

{

}
PintResponse
Code Block

{

}
StopRequest
Code Block

{

}
StopResponse
Code Block

{

}
StartRequest
Code Block

{

}
StartResponse
Code Block

{

}

Open Problems

  1. When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes, the consumer cannot find the coordinator and thus cannot be added to the group to start consuming. This rare case should be treated as an operational error since the migration of broker cluster should be incremental and adapt to consumer properties file.
  2. The rebalance thread pool and the heartbeating thread pool's sizes must be pre-specified, however the size of the consumers can be scaled during the operation. Hence more and more consumers must be handled by each thread as new consumers are added, which will increase the CPU burden.
  3. Since consumers no longer register themselves in Zookeeper, when a new coordinator stands up, it needs to wait for all the consumer to re-connect to it instead of reading the consumer info from the ZK, this may increase the latency of coordinator failover process.