Table of Contents |
---|
Kafka Replication
...
High-level
...
Design
...
The
...
purpose
...
of
...
adding
...
replication
...
in
...
Kafka
...
is
...
for
...
stronger
...
durability
...
and
...
higher
...
availability.
...
We
...
want
...
to
...
guarantee
...
that
...
any
...
successfully
...
published
...
message
...
will
...
not
...
be
...
lost
...
and
...
can
...
be
...
consumed,
...
even
...
when
...
there
...
are
...
server
...
failures.
...
Such
...
failures
...
can
...
be
...
caused
...
by
...
machine
...
error,
...
program
...
error,
...
or
...
more
...
commonly,
...
software
...
upgrades.
...
We
...
have
...
the
...
following
...
high-level
...
goals:
...
- Configurable
...
- durability
...
- guarantees:
...
- For
...
- example,
...
- an
...
- application
...
- with
...
- critical
...
- data
...
- can
...
- choose
...
- stronger
...
- durability,
...
- with
...
- increased
...
- write
...
- latency,
...
- and
...
- another
...
- application
...
- generating
...
- a
...
- large
...
- volume
...
- of
...
- soft-state
...
- data
...
- can
...
- choose
...
- weaker
...
- durability
...
- but
...
- better
...
- write
...
- response
...
- time.
...
- Automated
...
- replica
...
- management:
...
- We
...
- want
...
- to
...
- simplify
...
- the
...
- assignment
...
- of
...
- replicas
...
- to
...
- broker
...
- servers
...
- and
...
- be
...
- able
...
- to
...
- grow
...
- the
...
- cluster
...
- incrementally.
...
There
...
are
...
mainly
...
two
...
problems
...
that
...
we
...
need
...
to
...
solve
...
here:
...
- How
...
- to
...
- assign
...
- replicas
...
- of
...
- a
...
- partition
...
- to
...
- broker
...
- servers
...
- evenly?
...
- For
...
- a
...
- given
...
- partition,
...
- how
...
- to
...
- propagate
...
- every
...
- message
...
- to
...
- all
...
- replicas?
...
Replica
...
placements
Initial placement
(only
...
create
...
topic,
...
make
...
decision
...
based
...
on
...
current
...
live
...
brokers
...
(manual
...
create
...
topic
...
command);
...
rebalance
...
command
...
(recomputed
...
assignment
...
based
...
on
...
new
...
cluster
...
topology,
...
every
...
partition
...
is
...
moved)
...
and
...
make
...
the
...
move)
...
We
...
first
...
use
...
an
...
administrative
...
api
...
to
...
create
...
the
...
initial
...
set
...
of
...
brokers:
Code Block |
---|
} create cluster with brokers broker-0, broker-1, broker2 {code} |
We
...
then
...
use
...
another
...
administrative
...
api
...
to
...
create
...
a
...
new
...
topic:
Code Block |
---|
} create topic topicX with 100 partitions {code} |
After
...
that,
...
the
...
following
...
information
...
will
...
be
...
registered
...
in
...
zookeeper:
...
- a
...
- list
...
- of
...
- brokers;
...
- a
...
- list
...
- of
...
- topics
...
- and
...
- for
...
- each
...
- topic,
...
- a
...
- list
...
- of
...
- partitions.
...
For
...
better
...
load
...
balancing,
...
we
...
want
...
to
...
over
...
partition
...
a
...
topic.
...
Typically,
...
there
...
will
...
be
...
many
...
more
...
partitions
...
than
...
servers.
...
For
...
each
...
topic,
...
we
...
want
...
to
...
divide
...
the
...
partitions
...
evenly
...
among
...
all
...
the
...
brokers.
...
We
...
sort
...
the
...
list
...
of
...
brokers
...
and
...
the
...
list
...
of
...
partitions.
...
If
...
there
...
are
...
n
...
brokers,
...
we
...
assign
...
the
...
ith
...
partition
...
to
...
the
...
(i
...
mod
...
n)th
...
broker.
...
The
...
first
...
replica
...
of
...
this
...
partition
...
will
...
reside
...
on
...
this
...
assigned
...
broker
...
and
...
is
...
referred
...
to
...
as
...
the
...
preferred
...
replica
...
of
...
this
...
partition.
...
We
...
want
...
to
...
place
...
the
...
other
...
replicas
...
in
...
such
...
a
...
way
...
that
...
if
...
a
...
broker
...
is
...
down,
...
its
...
load
...
is
...
spread
...
evenly
...
to
...
all
...
surviving
...
brokers,
...
instead
...
of
...
to
...
a
...
single
...
one.
...
In
...
order
...
to
...
achieve
...
that,
...
suppose
...
there
...
are
...
m
...
partitions
...
assigned
...
to
...
a
...
broker
...
i.
...
The
...
jth
...
replica
...
of
...
partition
...
k
...
will
...
be
...
assigned
...
to
...
broker
...
(i
...
+
...
j
...
+
...
k)
...
mod
...
n.
...
The
...
following
...
figure
...
illustrates
...
the
...
replica
...
assignments
...
for
...
partitions
...
p0
...
to
...
p14
...
on
...
brokers
...
broker-0
...
to
...
broker-4.
...
In
...
this
...
example,
...
if
...
broker-0
...
goes
...
down,
...
partitions
...
p0,
...
p1,
...
and
...
p2
...
can
...
be
...
served
...
from
...
all
...
remaining
...
4
...
brokers.
...
We
...
store
...
the
...
information
...
about
...
the
...
replica
...
assignment
...
for
...
ach
...
partition
...
in
...
Zookeeper.
...
Incrementally
...
add
...
brokers
...
online
...
We’d
...
like
...
to
...
be
...
able
...
to
...
incrementally
...
grow
...
the
...
set
...
of
...
brokers
...
using
...
an
...
administrative
...
command
...
like
...
the
...
following.
Code Block |
---|
} alter cluster add brokers broker-3, broker-4 {code} |
When
...
a
...
new
...
broker
...
is
...
added,
...
we
...
will
...
automatically
...
move
...
some
...
partitions
...
from
...
existing
...
brokers
...
to
...
the
...
new
...
one.
...
Out
...
goal
...
is
...
to
...
minimize
...
the
...
amount
...
of
...
data
...
movement
...
while
...
maintaining
...
a
...
balanced
...
load
...
on
...
each
...
broker.
...
We
...
use
...
a
...
standalone
...
coordinator
...
process
...
to
...
do
...
the
...
rebalance
...
and
...
the
...
algorithm
...
is
...
given
...
below.
...
Data
...
replication
...
We’d
...
like
...
to
...
allow
...
a
...
client
...
to
...
choose
...
either
...
asynchronous
...
or
...
synchronous
...
replication.
...
In
...
the
...
former
...
case,
...
a
...
message
...
to
...
be
...
published
...
is
...
acknowledged
...
as
...
soon
...
as
...
it
...
reaches
...
1
...
replica.
...
In
...
the
...
latter
...
case,
...
we
...
will
...
make
...
our
...
best
...
effort
...
to
...
make
...
sure
...
that
...
a
...
message
...
is
...
only
...
acknowledged
...
after
...
it
...
reaches
...
multiple
...
replicas.
...
When
...
a
...
client
...
tries
...
to
...
publish
...
a
...
message
...
to
...
a
...
partition
...
of
...
a
...
topic,
...
we
...
need
...
to
...
propagate
...
the
...
message
...
to
...
all
...
replicas.
...
We
...
have
...
to
...
decide:
...
- how
...
- to
...
- propagate
...
- a
...
- message;
...
- how
...
- many
...
- replicas
...
- receive
...
- the
...
- message
...
- before
...
- we
...
- acknowledge
...
- to
...
- the
...
- client;
...
- what
...
- happens
...
- when
...
- a
...
- replica
...
- goes
...
- down;
...
- what
...
- happens
...
- when
...
- a
...
- failed
...
- replica
...
- comes
...
- back
...
- again.
...
We
...
introduce
...
existing
...
replication
...
strategies
...
in
...
Section
...
2.1.
...
We
...
then
...
describe
...
our
...
synchronous
...
and
...
asynchronous
...
replication
...
in
...
Section
...
2.2
...
and
...
Section
...
2.3,
...
respectively.
...
Related
...
work
...
There
...
are
...
two
...
common
...
strategies
...
for
...
keeping
...
replicas
...
in
...
sync,
...
primary-backup
...
replication
...
and
...
quorum-based
...
replication.
...
In
...
both
...
cases,
...
one
...
replica
...
is
...
designated
...
as
...
the
...
leader
...
and
...
the
...
rest
...
of
...
the
...
replicas
...
are
...
called
...
followers.
...
All
...
write
...
requests
...
go
...
through
...
the
...
leader
...
and
...
the
...
leader
...
propagates
...
the
...
writes
...
to
...
the
...
follower.
...
In
...
primary-backup
...
replication,
...
the
...
leader
...
waits
...
until
...
the
...
write
...
completes
...
on
...
every
...
replica
...
in
...
the
...
group
...
before
...
acknowledging
...
the
...
client.
...
If
...
one
...
of
...
the
...
replicas
...
is
...
down,
...
the
...
leader
...
drops
...
it
...
from
...
the
...
current
...
group
...
and
...
continues
...
to
...
write
...
to
...
the
...
remaining
...
replicas.
...
A
...
failed
...
replica
...
is
...
allowed
...
to
...
rejoin
...
the
...
group
...
if
...
it
...
comes
...
back
...
and
...
catches
...
up
...
with
...
the
...
leader.
...
With
...
f
...
replicas,
...
primary-backup
...
replication
...
can
...
tolerate
...
f-1
...
failures.
...
In
...
the
...
quorum-based
...
approach,
...
the
...
leader
...
waits
...
until
...
a
...
write
...
completes
...
on
...
a
...
majority
...
of
...
the
...
replicas.
...
The
...
size
...
of
...
the
...
replica
...
group
...
doesn’t
...
change
...
even
...
when
...
some
...
replicas
...
are
...
down.
...
If
...
there
...
are
...
2f+1
...
replicas,
...
quorum-based
...
replication
...
can
...
tolerate
...
f
...
replica
...
failures.
...
If
...
the
...
leader
...
fails,
...
it
...
needs
...
at
...
least
...
f+1
...
replicas
...
to
...
elect
...
a
...
new
...
leader.
...
There
...
are
...
tradeoffs
...
between
...
the
...
2
...
approaches:
...
- The
...
- quorum-based
...
- approach
...
- has
...
- better
...
- write
...
- latency
...
- than
...
- the
...
- primary-backup
...
- one.
...
- A
...
- delay
...
- (e.g.,
...
- long
...
- GC)
...
- in
...
- any
...
- replica
...
- increases
...
- the
...
- write
...
- latency
...
- in
...
- the
...
- latter,
...
- but
...
- not
...
- the
...
- former.
...
- Given
...
- the
...
- same
...
- number
...
- of
...
- replicas,
...
- the
...
- primary-backup
...
- approach
...
- tolerates
...
- more
...
- concurrent
...
- failures.
...
- A
...
- replication
...
- factor
...
- of
...
- 2
...
- works
...
- well
...
- with
...
- the
...
- primary-backup
...
- approach.
...
- In
...
- quorum-based
...
- replication,
...
- both
...
- replicas
...
- have
...
- to
...
- be
...
- up
...
- for
...
- the
...
- system
...
- to
...
- be
...
- available.
...
We
...
choose
...
the
...
primary-backup
...
replication
...
in
...
Kafka
...
since
...
it
...
tolerates
...
more
...
failures
...
and
...
works
...
well
...
with
...
2
...
replicas.
...
A
...
hiccup
...
can
...
happen
...
when
...
a
...
replica
...
is
...
down
...
or
...
becomes
...
slow.
...
However,
...
those
...
are
...
relatively
...
rare
...
events
...
and
...
the
...
hiccup
...
time
...
can
...
be
...
reduced
...
by
...
tuning
...
various
...
timeout
...
parameters.
...
Synchronous replication
Our synchronous replication follows the typical primary-backup
...
approach.
...
Each
...
partition
...
has
...
n
...
replicas
...
and
...
can
...
tolerate
...
n-1
...
replica
...
failures.
...
One
...
of
...
the
...
replicas
...
is
...
elected
...
as
...
the
...
leader
...
and
...
the
...
rest
...
of
...
the
...
replicas
...
are
...
followers.
...
The
...
leader
...
maintains
...
a
...
set
...
of
...
in-sync
...
replicas
...
(ISR):
...
the
...
set
...
of
...
replicas
...
that
...
have
...
fully
...
caught
...
up
...
with
...
the
...
leader.
...
For
...
each
...
partition,
...
we
...
store
...
in
...
Zookeeper
...
the
...
current
...
leader
...
and
...
the
...
current
...
ISR.
...
Each
...
replica
...
stores
...
messages
...
in
...
a
...
local
...
log
...
and
...
maintains
...
a
...
few
...
important
...
offset
...
positions
...
in
...
the
...
log
...
(depicted
...
in
...
Figure
...
1).
...
The
...
log
...
end
...
offset
...
(LEO)
...
represents
...
the
...
tail
...
of
...
the
...
log.
...
The
...
high
...
watermark
...
(HW)
...
is
...
the
...
offset
...
of
...
the
...
last
...
committed
...
message.
...
Each
...
log
...
is
...
periodically
...
synced
...
to
...
disks.
...
Data
...
before
...
the
...
flushed
...
offset
...
is
...
guaranteed
...
to
...
be
...
persisted
...
on
...
disks.
...
As
...
we
...
will
...
see,
...
the
...
flush
...
offset
...
can
...
be
...
before
...
or
...
after
...
HW.
...
Writes
To publish a message to a partition, the client first finds the leader of the partition from Zookeeper and sends the message to the leader. The leader writes the message to its local log. Each follower constantly pulls new messages from the leader using a single socket channel. That way, the follower receives all messages in the same order as written in the leader. The follower writes each received message to its own log and sends an acknowledgment back to the leader. Once the leader receives the acknowledgment from all replicas in ISR, the message is committed. The leader advances the HW and sends an acknowledgment to the client. For better performance, each follower sends an acknowledgment after the message is written to memory. So, for each committed message, we guarantee that the message is stored in multiple replicas in memory. However, there is no guarantee that any replica has persisted the commit message to disks though. Given that correlated failures are relatively rare, this approach gives us a good balance between response time and durability. In the future, we may consider adding options that provide even stronger guarantees. The leader also periodically broadcasts the HW to all followers. The broadcasting can be piggybacked on the return value of the fetch requests from the followers. From time to time, each replica checkpoints its HW to its disk.
Reads
For simplicity, reads are always served from the leader. Only messages up to the HW are exposed to the reader.
Failure scenarios
Follower failure
After a configured timeout period, the leader will drop the failed follower from its ISR and writes will continue on the remaining replicas in ISR. If the failed follower comes back, it first truncates its log to the last checkpointed HW. It then starts to catch up all messages after its HW from the leader. When the follower fully catches up, the leader will add it back to the current ISR.
Leader failure
There are 3 cases of leader failure which should be considered -
- The leader crashes before writing the messages to its local log. In this case, the client will timeout and resend the message to the new leader.
- The leader crashes after writing the messages to its local log, but before sending the response back to the client
- Atomicity has to be guaranteed: Either all the replicas wrote the messages or none of them
- The client will retry sending the message. In this scenario, the system should ideally ensure that the messages are not written twice. Maybe, one of the replicas had written the message to its local log, committed it, and it gets elected as the new leader.
- The leader crashes after sending the response. In this case, a new leader will be elected and start receiving requests.
When this happens, we need to perform the following steps to elect a new leader.
- Each surviving replica in ISR registers itself in Zookeeper.
- The replica that registers first becomes the new leader. The new leader chooses its LEO as the new HW.
- Each replica registers a listener in Zookeeper so that it will be informed of any leader change. Everytime a replica is notified about a new leader:
- If the replica is not the new leader (it must be a follower), it truncates its log to its HW and then starts to catch up from the new leader.
- The leader waits until all surviving replicas in ISR have caught up or a configured time has passed. The leader writes the current ISR to Zookeeper and opens itself up for both reads and writes.
(Note, during the initial startup when ISR is empty, any replica can become the leader.)
Asynchronous replication
To support asynchronous replication, the leader can acknowledge the client as soon it finishes writing the message to its local log. The only caveat is that during the catchup phase, the follower may have to truncate the data before its HW. Since the replication is asynchronous, there is no guarantee that a commit message can survive any broker failure.
Open Issues
- How can atomicity be guaranteed in the 2nd type of leader failure
- If the brokers are in multiple racks, how to guarantee that at least one replica goes to a different rack?
Kafka Replication Detailed Design
The document describes key data structures and algorithms in Kafka replication.
Paths stored in Zookeeper
Notation: When an element in a path is denoted xyz, that means that the value of xyz is not fixed and there is in fact a znode for each possible value of xyz. For example /topics/topic would be a directory named /topics containing a directory for each topic name. An arrow -> is used to indicate the contents of a znode. For example /hello -> world would indicate a znode /hello containing the value "world". A path is persistent unless it’s marked as ephemeral.
We store the following paths in Zookeeper:
Wiki Markup Stores the information of all live brokers. /brokers/ids/\[broker_id\] \--> host:port (ephemeral; created by admin)
...
Wiki Markup Stores for each partition, a list of the currently assigned replicas. For each replica, we store the id of the broker to which the replica is assigned. The first replica is the preferred replica. Note that for a given partition, there is at most 1 replica on a broker. Therefore, the broker id can be used as the replica id /brokers/topics/\[topic\]/\[partition_id\]/replicas \--> \{broker_id …\} (created by admin)
...
Wiki Markup Stores the id of the replica that’s the current leader of this partition - /brokers/topics/\[topic\]/\[partition_id\]/leader \--> broker_id (ephemeral) (created by leader)
...
Wiki Markup Stores the id of the set of replicas that are in-sync with the leader - /brokers/topics/\[topic\]/\[partition_id\]/ISR \-->\{broker_id, …\}(created by leader)
...
- This
...
- path
...
- is
...
- used
...
- when
...
- we
...
- want
...
- to
...
- reassign
...
- some
...
- partitions
...
- to
...
- a
...
- different
...
- set
...
- of
...
- brokers.
...
- For
...
- each
...
- partition
...
- to
...
- be
...
- reassigned,
...
- it
...
- stores
...
- a
...
- list
...
- of
...
- new
...
- replicas
...
- and
...
- their
...
- corresponding
...
- assigned
...
- brokers.
...
Wiki Markup This path is created by an administrative process and is automatically removed once the partition has been moved successfully - /brokers/partitions_reassigned/\[topic\]/\[partition_id\] \--> \{broker_id …\} (created by admin)
...
Key
...
data
...
structures
...
Every
...
broker
...
stores
...
a
...
list
...
of
...
partitions
...
and
...
replicas
...
assigned
...
to
...
it.
...
The
...
current
...
leader
...
of
...
a
...
partition
...
further
...
maintains
...
3
...
sets:
...
AR,
...
ISR,
...
CUR
...
and
...
RAR,
...
which
...
correspond
...
to
...
the
...
set
...
of
...
replicas
...
that
...
are
...
assigned
...
to
...
the
...
partition,
...
in-sync
...
with
...
the
...
leader,
...
catching
...
up
...
with
...
the
...
leader,
...
and
...
being
...
reassigned
...
to
...
other
...
brokers.
...
Normally,
...
ISR
...
...
AR
...
and
...
AR
...
=
...
ISR
...
+
...
CUR.
...
The
...
leader
...
of
...
a
...
partition
...
maintains
...
a
...
commitQ
...
and
...
uses
...
it
...
to
...
buffer
...
all
...
produce
...
requests
...
to
...
be
...
committed.
...
For
...
each
...
replica
...
assigned
...
to
...
a
...
broker,
...
the
...
broker
...
periodically
...
stores
...
its
...
HW
...
in
...
a
...
checkpoint
...
file.
Code Block |
---|
} Replica { // a replica of a partition broker_id : int partition : Partition isLocal : Boolean // is this replica local to this broker log : Log // local log associated with this replica hw : long // offset of the last committed message leo : long // log end offset } Partition { //a partition in a topic topic : string partition_id : int leader : Replica // the leader replica of this partition commitQ : Queue // produce requests pending commit at the leader AR : Set[Replica] // replicas assigned to this partition ISR : Set[Replica] // In-sync replica set, maintained at the leader CUR : Set[Replica] // Catch-up replica set, maintained at the leader RAR : Set[Replica] // Reassigned replica set, maintained at the leader } {code} h2. Key algorithms h3. Zookeeper listeners ONLY on the leader # |
Key algorithms
Zookeeper listeners ONLY on the leader
- Replica-change
...
- listener:
...
- child
...
- change
...
- on
...
- /brokers/topics
...
- (new
...
- topic
...
- registered)
...
- child
...
- change
...
- on
...
- /brokers/topics/
...
...
- (new
...
- partition
...
- registered)
...
- value
...
- change
...
- on
...
- /brokers/topics/
...
...
- /
...
...
- /replicas
...
- (new
...
- replica
...
- assigned)
...
- Partition-reassigned
...
- listener:
...
- child
...
- change
...
- on
...
- /brokers/partitions_reassigned
...
- child
...
- change
...
- on
...
- /brokers/partitions_reassigned/
...
Zookeeper listeners on all brokers
- Leader-change
...
- listener:
...
- value
...
- change
...
- on
...
- /brokers/topics/
...
...
- /leader
...
- State-change
...
- listener:
...
- child
...
- change
...
- on
...
- /brokers/topics/
...
...
- /
...
...
- /state/
...
Configuration parameters
- LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
- KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
Broker startup
Each time a broker starts up, it calls brokerStartup() and the algorithms are described below
Code Block |
---|
|broker_id]
h3. Configuration parameters
# LeaderElectionWaitTime: controls the maximum amount of time that we wait during leader election.
# KeepInSyncTime: controls the maximum amount of time that a leader waits before dropping a follower from the in-sync replica set.
h3. Broker startup
Each time a broker starts up, it calls brokerStartup() and the algorithms are described below
{code}
brokerStartup()
{
register its broker_id in /brokers/ids/[broker_id] in ZK
get replica info from ZK and compute AR, a list of replicas assigned to this broker
for each r in AR
{
register its broker_id in /brokers/topics/[topic]/[partition_id]/state/[broker_id] in ZK
register stateChangeListener() on /brokers/topics/[topic]/[partition_id]/state/[broker_id]
}
}
|
State change listener
Each broker has a ZK path that it listens to for state change requests from the leader
Code Block |
---|
{code} h4. State change listener Each broker has a ZK path that it listens to for state change requests from the leader {code} stateChangeListener(r) { // listens to state change requests issued by the leader and acts on those read next state change request if(becomeFollowerRequest) becomeFollower(r) if(replicaUnassignedRequest) closeReplica(r) if(deleteReplicaRequest) deleteReplica(r) if(replicaAssignedRequest) replicaStateChange(r) } {code} h3. State change operations h4. Replica state change This state change is requested by the |
State change operations
Replica state change
This state change is requested by the leader for a new replica assignment
Code Block |
---|
leader for a new replica assignment
{code}
replicaStateChange(r: Replica) {
if( r's log is not already started) {
do local recovery of r's log
r.hw = min(last checkpointed HW for r, r.leo)
register a leader-change listener on partition r.partition.partition_id
}
if( a leader does not exist for partition r.partition.partition_id in ZK)
leaderElection(r)
}
|
Close replica
This state change is requested by the leader when a topic is deleted
Code Block |
---|
{code}
h4. Close replica
This state change is requested by the leader when a topic is deleted
{code}
closeReplica(r: Replica)
{
stop the fetcher associated with r, if one exists
close and delete r
}
|
Become follower
This state change is requested by the leader when the leader for a replica changes
Code Block |
---|
{code} h4. Become follower This state change is requested by the leader when the leader for a replica changes {code} becomeFollower(r: Replica) { stop the current ReplicaFetcherThread, if any truncate the log to r.hw start a new ReplicaFetcherThread to the current leader of r, from offset r.leo start HW checkpoint thread for r } {code} h4. Become leader This state change is done by the new leader {code} |
Become leader
This state change is done by the new leader
Code Block |
---|
becomeLeader(r: Replica, ISR: Set[Replica], AR: Set[Replica])
stop HW checkpoint thread for r
r.hw = r.leo // TODO: check if this should actually be r.hw = last checkpointed HW for r
wait until every live replica in AR catches up (i.e. its leo == r.hw) or a KeepInSyncTime has passed
r.partition.ISR = the current set of replicas in sync with r
r.partition.CUR = AR - ISR
write r.partition.ISR & r.partition.CUR in ZK
r.partition.RAR = replicas in /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
r.partition.leader = r // this enables reads/writes to this partition on this broker
start a commit thread on r.partition
start HW checkpoint thread for r
get replica info from ZK and compute CUR, a list of replicas assigned to this broker
for each r in CUR
send become follower request to r
{code}
h3. Leader election
{code} |
Leader election
Code Block |
---|
leaderElection(r: Replica) read the current ISR and AR for r.partition.partition_id from ZK if( (r in AR) && (ISR is empty || r in ISR) ) { get r.hw for all r in ISR if(r.hw < max(hw) for r in ISR) wait for PreferredReplicaTime if r is not the preferred replica if(successfully write r as the current leader of r.partition in ZK) becomeLeader(r, ISR, CUR) } else { // some other replica will become leader } |
Produce requests
Produce request handler on the leader
Code Block |
---|
{code}
h3. Produce requests
h4. Produce request handler on the leader
{code}
produceRequestHandler(pr: ProduceRequest)
{
if( the request partition pr.partition doesn't have leader replica on this broker)
throw NotLeaderException
log = r.partition.leader.log
append pr.messages to log
pr.offset = log.LEO
add pr to pr.partition.commitQ
}
|
HW checkpoint thread for partition p
Code Block |
---|
{code} h3. HW checkpoint thread for partition p {code} while(true) { r = p.leader flush r.hw & r.log to disk } {code} h3. Committer thread for partition p {code} |
Committer thread for partition p
Code Block |
---|
while(true) {
pr = commitQ.dequeue
canCommit = false
while(!canCommit) {
canCommit = true
for each r in ISR
if(!offsetReached(r, pr.offset)) {
canCommit = false
break
}
if(!canCommit) {
p.CUR.add(r)
p.ISR.delete(r)
write p.ISR to ZK
}
}
for each c in CUR
if(c.leo >= pr.offset) {
p.ISR.add(c); p.CUR.delete(c); write p.ISR to ZK
}
checkReassignedReplicas(pr, p.RAR, p.ISR)
checkLoadBalancing()
r.hw = pr.offset // increment the HW to indicate that pr is committed
send ACK to the client that pr is committed
}
offsetReached(r: Replica, offset: Long) {
if(r.leo becomes equal or larger than offset within KeepInSyncTime) return true
return false
}
checkLoadBalancing() { // see if we need to switch the leader to the preferred replica
if(leader replica is not the preferred one & the preferred replica is in ISR) {
delete /brokers/topics/[topic]/[partition_id]/leader in ZK
stop this commit thread
stop the HW checkpoint thread
}
}
checkReassignedReplicas(pr: ProduceRequest, RAR: Set[Replica], ISR: Set[Replica])
{
// see if all reassigned replicas have fully caught up, if so, switch to those replicas
// optimization, do the check periodically
If (every replica in RAR has its leo >= pr.offset) {
//newly assigned replicas are in-sync, switch over to the new replicas
write (RAR + ISR) as the new ISR in ZK //need (RAR + ISR) in case we fail right after here
update /brokers/topics/[topic]/[partition_id]/replicas in ZK with the new replicas in RAR
delete /brokers/topics/[topic]/[partition_id]/leader in ZK //triggers leader election
delete /brokers/partitions_reassigned/[topic]/[partition_id] in ZK
stop this commit thread
}
}
{code}
h3. Follower fetching from leader
A follower keeps sending ReplicaFetcherRequests to the leader. The process at the leader and the follower are described below -
{code} |
Follower fetching from leader
A follower keeps sending ReplicaFetcherRequests to the leader. The process at the leader and the follower are described below -
Code Block |
---|
ReplicaFetchReqeust {
topic: String
partition_id: Int
replica_id: Int
offset: Long
}
ReplicaFetchResponse {
hw: Long // the offset of the last message committed at the leader
messages: MessageSet // fetched messages
}
{code}
h4. At |
At the leader
Code Block |
---|
the leader
{code}
replicaFetch (f: ReplicaFetchRequest) { // handler for ReplicaFetchRequest at leader
leader = getLeaderReplica(f.topic, f.partition_id)
if(leader == null) throw NotLeaderException
response = new ReplicaFetcherResponse
getReplica(f.topic, f.partition_id, f.replica_id).leo = f.offset
response.messages = fetch messages starting from f.offset from leader.log
response.hw = leader.hw
send response back
}
|
At the follower
Code Block |
---|
{code} h4. At the follower {code} ReplicaFetcherThread for Replica r: while(true) { send ReplicaFetchRequest to leader and get response:ReplicaFetcherResponse back append response.messages to r's log r.hw = response.hw advance offset in ReplicaFetchRequest } {code} h3. Leader change event |