Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The

...

following

...

is

...

a

...

draft

...

design

...

that

...

uses

...

a

...

controller

...

for

...

leader

...

election

...

and

...

other

...

admin

...

related

...

tasks.

...

Major changes compared with the v2 proposal.
  • leadership changes are now made by a controller
  • the controller detects broker failures and elects a new leader for each affected partition.
  • each leadership change is communicated by the controller to each affected broker.
Overview:

One of the brokers is elected as the controller for the whole cluster. It will be responsible for:

  1. Leadership change of a partition (each leader can independently update ISR)
  2. New topics; deleted topics
  3. Replica reassignment

After the controller makes a decision, it publishes the decision permanently in ZK and also informs the new decisions through ZKQueue to affected brokers. The published decisions are the source of truth and they are used by clients (for request routing) and for broker startup (to bring all replicas assigned to a broker to the right state). After the broker is started, it picks up new decisions made by the controller from ZKQueue.

Potential benefits:

  1. Easier debugging since leadership changes are made in a central place.
  2. ZK reads/writes needed for leadership changes can be batched (also easier to exploit ZK multi) and thus reduce end-to-end latency during failover.
  3. Fewer ZK watchers.

Potential downside:

  1. Need controller failover.

Paths:

  1. Stores the information of all live brokers.
    Code Block
    /brokers/ids/[broker_id] --> host:port (ephemeral; created by admin) 

...

  1. Stores the replication assignment for all partitions in a topic. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id
    Code Block
    /brokers/topics/[topic] --> {part1: [broker1, broker2], part2: [broker2, broker3] ...}  (created by admin) 

...

  1. Stores leader and ISR of a partition
    Code Block
     /brokers/topics/[topic]/[partition_id]/leaderAndISR --> {leader: broker_id, ISR: {broker1, broker2}} (updated by controller or current leader) 

...

  1. This path is used when we want to reassign some partitions to a different set of brokers. For each partition to be reassigned, it stores a list of new replicas and their corresponding assigned brokers. This path is created by an administrative process and is automatically removed once the partition has been moved successfully
    Code Block
     /brokers/partitions_reassigned/[topic]/[partition_id] --> {broker_id …} (created by admin) 


...

  1. ZKQueue:

...

  1. Used

...

  1. to

...

  1. communicate

...

  1. state

...

  1. change

...

  1. information

...

  1. from

...

  1. controller

...

  1. to

...

  1. each

...

  1. broker.
{
Code Block
} /brokers/state/[broker_id]/[i] --> { state change requests ... } (created by controller) {code}

h4.


h5.  
A.

...

Failover

...

during

...

broker

...

failure.

...

Controller

...

watches

...

child

...

changes

...

of

...

/brokers/ids

...

path.

...

When

...

the

...

watcher

...

gets

...

triggered,

...

it

...

calls

...

on_broker_change().

{
Code Block
}
on_broker_change():
The controller keeps in-memory for every partition: leader, AR
1. call change_leaders() on the current list of partitions

change_leaders():
Input: a list of partitions and their leader, AR
1. Read the current live broker list
2. Determine the set of partitions whose leader is not in the live broker list.
3. For each such partition P
3.1 Read the current ISR of P from ZK
3.2 Determine the new leader and the new ISR of P:
    If ISR has at least 1 broker in the live broker list, select one of those brokers as the new leader. The new ISR includes all brokers in the current ISR that are alive.
    Otherwise, select one of the live brokers in AR as the new leader and set that broker as the new ISR (potential data loss in this case).
    Question A.1, what happens if none of the brokers in AR is alive?
4. For each such partition p, write the new leader and ISR in /brokers/topics/[topic]/[partition_id]/leaderAndISR
5. Publish the new leader/ISR for each affected partition to the ZKQueue of the affected brokers. For efficiency, the controller can write the decision for all affected partitions in 1 path in ZKQueue.
(Ideally we want to use ZK multi to conditionally write 3 and 4 in 1 transaction for better latency and correctness.)
Question A.2, should the broker publish decisions to the brokers that are currently down? If not, how do we guarantee that no decision changes are lost?
Question A.3, is broker down the only failure scenario that we worry about? Do we worry about leader failure at individual partition level?
{code}

h5. 
B.

...

Broker

...

acts

...

on

...

leadership

...

change.

...

Each

...

broker

...

registers

...

a

...

child

...

watcher

...

on

...

its

...

ZKQueue.

...

When

...

the

...

watcher

...

gets

...

triggered,

...

it

...

calls

...

on_leader_assignment_change().

{
Code Block
}
on_leader_assignment_change():
1. Read from this broker's ZKQueue, the list of partitions whose leader/ISR has changed.
2. For each such partition P
2.1 If this broker is the new leader,
2.1.1 stop the fetcher to the current leader
2.1.2 become the leader (This is critical: the leader can only update ISR in the future if it hasn't been changed by the controller)
2.2 If this broker is following a new leader
2.2.1 stop the fetcher to the current leader
2.2.2 become a follower
{code}

h5. 
C.

...

Creating/deleting

...

topics.

...

Controller

...

watches

...

child

...

change

...

of

...

/brokers/topics.

...

When

...

the

...

watcher

...

gets

...

triggered,

...

it

...

calls

...

on_topic_change().

{
Code Block
}
on_topic_change():
The controller keeps in memory a list of existing topics.
1. If a new topic is created, read topic's replica assignment.
1.1. call init_leaders() on all newly created partitions.
2. If a topic is deleted, send the stopReplica state change to all affected brokers.
{code}

Question

...

C1.

...

How

...

to

...

deal

...

with

...

repeated

...

topic

...

deletion/creation?

...

A

...

broker

...

can

...

be

...

down

...

for

...

a

...

long

...

time

...

during

...

which

...

a

...

topic

...

can

...

be

...

deleted

...

and

...

recreated.

...

When

...

the

...

broker

...

comes

...

up,

...

the

...

topic

...

it

...

has

...

locally

...

may

...

not

...

match

...

the

...

content

...

of

...

the

...

newly

...

created

...

topic.

...

There

...

are

...

a

...

couple

...

of

...

ways

...

of

...

dealing

...

with

...

this.

...

(1)

...

Simply

...

let

...

the

...

broker

...

with

...

the

...

outdated

...

topic

...

become

...

a

...

follower

...

and

...

figure

...

out

...

the

...

right

...

offset

...

from

...

which

...

it

...

can

...

sync

...

up

...

with

...

the

...

leader.

...

(2)

...

Keep

...

a

...

version

...

ID

...

for

...

each

...

topic/partition.

...

Delete

...

a

...

partition

...

on

...

broker

...

startup

...

if

...

the

...

partition

...

version

...

is

...

outdated.

{
Code Block
}
init_leaders():
Input: a list of partitions and their AR
0. Read the current live broker list
1. For each partition P
1.1 Select one of the live brokers in AR as the new leader and set all live brokers in AR as the new ISR.
2. For each such partition p, write the new leader and ISR in /brokers/topics/[topic]/[partition_id|partition_id]/leaderAndISR
3. Publish the new leader/ISR for each affected partition to the ZKQueue of the affected brokers. Again, for efficiency, the controller can write the decision for all affected partitions in 1 path in ZKQueue.
{code}

(Ideally

...

we

...

want

...

to

...

use

...

ZK

...

multi

...

to

...

conditionally

...

write

...

2

...

and

...

3

...

in

...

1

...

transaction

...

for

...

better

...

latency

...

and

...

correctness.)

...

D.

...

Handling

...

controller

...

failure.

...

Each

...

broker

...

sets

...

an

...

exists

...

watch

...

on

...

/controller

...

(ephemeral).

...

When

...

the

...

watcher

...

gets

...

triggered,

...

it

...

calls

...

on_controller_failover().

...

Basically,

...

the

...

controller

...

needs

...

to

...

inform

...

each

...

of

...

the

...

brokers

...

all

...

decisions

...

that

...

it

...

has

...

made

...

in

...

the

...

history

...

(since

...

it's

...

not

...

sure

...

if

...

there

...

is

...

any

...

decision

...

lost

...

during

...

the

...

controller

...

failover).

...

A

...

broker

...

can

...

ignore

...

decisions

...

that

...

it

...

has

...

followed

...

already.

{
Code Block
}
on_controller_failover():
1. create /controller \-> this broker id
2. if successful
2.1 write all published decisions (leader/ISR for each partition) to ZKQueue to all brokers.
2.2 change_leaders()
2.3 for the list of partitions without a leader, call init_leaders().
{code}

h5. 
E.

...

Broker

...

startup.

...

When

...

a

...

broker

...

starts

...

up,

...

it

...

calls

...

on_broker_startup().

...

Basically,

...

the

...

broker

...

needs

...

to

...

first

...

read

...

all

...

published

...

decisions

...

about

...

each

...

partition.

{
Code Block
}
on_broker_startup():
1. read all /brokers/topics/[topic]
2. read /broker/leader_assignment
3. for each replica assigned to this broker
3.1 start replica
3.2 if this broker is a leader of this partition, become leader. (shouldn't happen in general)
3.3 if this broker is a follower of this partition, become follower.
4. subscribes to changes in ZKQueue for this broker.
{code}

h5. 
F.

...

Replica

...

reassignment:

...

Wiki Markup
Controller watches child changes in /brokers/partitions_reassigned/\[topic\]. When the watcher gets triggered, it calls on_partitions_reassigned().

{
Code Block
}
on_partitions_reassigned():
1. read /brokers/partitions_reassigned/[topic]
2. issue StartReplica command to the right brokers.
3. periodically check ISR of affected partitions
3.1 if ISR == AR+RAR, update ISR (??? need to do that in individual ISR and leader path), and send StopReplica command to the right brokers.
3.2 update /brokers/topics/[topic] to change AR to the new replica set
3.3 delete /brokers/partitions_reassigned/[topic]
(An alternative approach to 3 is to set watches on ISR and do the check only when ISR is changed.)
4. inform the current leader of the ISR change by write ISRState change in ZKQueue
{code}

h5. Discussions:


h5. 1. 
Discussions:
1. End-to-end

...

latency

...

during

...

a

...

broker

...

failure:

...

  1. broker

...

  1. shutdown

...

  1. (after

...

  1. closing

...

  1. socket

...

  1. server,

...

  1. need

...

  1. to

...

  1. close

...

  1. request

...

  1. handler,

...

  1. close

...

  1. log)

...

  1. broker

...

  1. watcher

...

  1. gets

...

  1. triggered

...

  1. in

...

  1. controller

...

  1. make

...

  1. leadership

...

  1. change

...

  1. and

...

  1. publish

...

  1. the

...

  1. new

...

  1. leader/ISR

...

  1. in

...

  1. ZK

...

  1. (1

...

  1. ZK

...

  1. write

...

  1. per

...

  1. affected

...

  1. partition)

...

  1. inform

...

  1. the

...

  1. leadership

...

  1. change

...

  1. to

...

  1. each

...

  1. broker

...

  1. by

...

  1. write

...

  1. to

...

  1. ZKQueue

...

  1. (1

...

  1. ZK

...

  1. write

...

  1. per

...

  1. broker)

...

  1. leader

...

  1. waits

...

  1. for

...

  1. followers

...

  1. in

...

  1. ISR

...

  1. to

...

  1. connect

...

  1. (Kafka

...

  1. PRC)

...

  1. follower

...

  1. truncates

...

  1. its

...

  1. log

...

  1. first

...

  1. (a

...

  1. potential

...

  1. I/O)

...

  1. and

...

  1. then

...

  1. starts

...

  1. fetching

...

  1. from

...

  1. leader

...

In

...

the

...

critical

...

path,

...

the

...

most

...

time

...

consuming

...

operation

...

is

...

step

...

3

...

where

...

we

...

need

...

to

...

write

...

1

...

ZK

...

path

...

per

...

partition.

...

Assuming

...

that

...

during

...

a

...

broker

...

failover

...

we

...

need

...

to

...

change

...

leader

...

for

...

10K

...

partitions

...

and

...

each

...

ZK

...

write

...

takes

...

2ms,

...

this

...

could

...

take

...

20

...

secs.

...

One

...

possibility

...

is

...

to

...

use

...

the

...

multi()

...

support

...

in

...

ZK

...

3.4

...

to

...

batch

...

those

...

writes

...

in

...

1

...

ZK

...

operation.

...

2.

...

ZKQueue:

...

Communicating

...

between

...

the

...

controller

...

and

...

the

...

brokers

...

via

...

ZK

...

is

...

not

...

efficient.

...

Each

...

communication

...

requires

...

2

...

ZK

...

writes

...

(each

...

costs

...

roughly

...

2

...

RPC),

...

1

...

watcher

...

firing

...

and

...

1

...

ZK

...

read.

...

These

...

add

...

up

...

to

...

roughly

...

6

...

RPCs

...

per

...

communication.

...

An

...

alternative

...

is

...

to

...

implement

...

an

...

admin

...

RPC

...

in

...

the

...

broker

...

for

...

direct

...

communication

...

between

...

the

...

controller

...

and

...

the

...

brokers.

...

Then

...

each

...

communication

...

costs

...

only

...

1

...

RPC.

...

The

...

admin

...

RPC

...

could

...

specify

...

a

...

timeout,

...

during

...

which

...

it

...

expects

...

the

...

admin

...

command

...

to

...

be

...

completed.

...

3.

...

Dealing

...

with

...

multiple

...

leaders

...

in

...

transition:

...

Occasionally,

...

it's

...

possible

...

for

...

multiple

...

brokers

...

to

...

simultaneous

...

assume

...

that

...

they

...

are

...

the

...

leader

...

of

...

a

...

partition.

...

For

...

example,

...

broker

...

A

...

is

...

the

...

initial

...

leader

...

of

...

a

...

partition and the ISR of that partition is {A,B,C}..

...

Then,

...

broker

...

A

...

goes

...

into

...

GC

...

and

...

losses

...

its ZK registration. The controller assumes that broker A is dead, assigns the leader to broker B and sets the new ISR in ZK to {B,C}.

...

Broker

...

B

...

becomes

...

the

...

leader

...

and

...

at

...

the

...

same

...

time,

...

Broker

...

A

...

wakes

...

up

...

from

...

GC

...

but

...

hasn't

...

acted

...

on

...

the

...

leadership

...

change

...

decision

...

made

...

by

...

the

...

controller.

...

Now,

...

both

...

broker

...

A

...

and

...

B

...

think

...

they

...

are

...

the

...

leaders.

...

It

...

would

...

be

...

bad

...

if

...

we

...

allow

...

both

...

broker

...

A

...

and

...

B

...

to

...

commit

...

new

...

messages

...

since

...

the

...

data

...

among

...

replicas

...

will

...

be

...

out

...

of

...

sync.

...

In

...

fact,

...

our

...

current

...

design

...

will

...

prevent

...

broker

...

A

...

from

...

committing

...

any

...

new

...

message

...

in

...

the

...

situation.

...

Here

...

is

...

why.

...

 The claim

...

is

...

that

...

after

...

broker

...

B

...

becomes

...

the

...

new

...

leader,

...

broker

...

A

...

can

...

no

...

longer

...

commit

...

new

...

messages

...

any

...

more.

...

For

...

broker

...

A

...

to

...

commit

...

a

...

message

...

m,

...

it

...

needs

...

every

...

replica

...

in

...

ISR

...

to

...

receive

...

m.

...

At

...

the

...

moment,

...

broker

...

A

...

still

...

thinks

...

the

...

ISR

...

is

...

{A,B,C}

...

(its

...

local

...

copy;

...

although

...

the

...

ISR

...

in

...

ZK

...

has

...

changed).

...

Broker

...

B

...

will

...

never

...

receive

...

message

...

m.

...

This

...

is

...

because

...

by

...

becoming

...

the

...

new

...

leader,

...

it

...

must

...

have

...

first

...

stopped

...

fetching

...

data

...

from

...

the

...

previous

...

leader.

...

Therefore

...

broker

...

A

...

can't

...

commit

...

message

...

m

...

without

...

shrinking

...

the

...

ISR

...

first.

...

In

...

order

...

to

...

shrink

...

ISR,

...

broker

...

A

...

has

...

to

...

write

...

the

...

new

...

ISR

...

in

...

ZK.

...

However,

...

it

...

can't

...

do

...

that

...

because

...

it

...

will

...

realize

...

that

...

the

...

leaderAndISR

...

node

...

in

...

ZK

...

is

...

not

...

on

...

a

...

version

...

that

...

it

...

assumes

...

to

...

be

...

(since

...

it

...

has

...

already

...

been

...

changed

...

by

...

the

...

controller).

...

At

...

this

...

moment,

...

broker

...

A

...

will

...

realize

...

that

...

it's

...

no

...

longer

...

the

...

leader

...

any

...

more.