...
The
...
following
...
is
...
a
...
draft
...
design
...
that
...
uses
...
a
...
high-available
...
consumer
...
coordinator
...
at
...
the
...
broker
...
side
...
to
...
handle
...
consumer
...
rebalance.
...
By
...
migrating
...
the
...
rebalance
...
logic
...
from
...
the
...
consumer
...
to
...
the
...
coordinator
...
we
...
can
...
resolve
...
the
...
consumer
...
split
...
brain
...
problem
...
and
...
help
...
thinner
...
the
...
consumer
...
client.
...
Overview:
...
One
...
of
...
the
...
brokers
...
is
...
elected
...
as
...
the
...
coordinator
...
for
...
all
...
the
...
consumer
...
groups.
...
It
...
will
...
be
...
responsible
...
for:
...
- Watch
...
- for
...
- new
...
- topics
...
- and
...
- topic
...
- partition
...
- changes
...
- in
...
- Zookeeper
...
- Accept
...
- and
...
- maintain
...
- a
...
- socket
...
- channel
...
- request
...
- from
...
- newly
...
- added
...
- consumers
...
- Watch
...
- for
...
- consumer
...
- failure
...
- by
...
- periodic
...
- pinging
...
- them
...
- via
...
- the
...
- socket
...
- channels
...
- Rebalance
...
- for
...
- affected
...
- groups
...
- in
...
- response
...
- to
...
- topic
...
- partition
...
- (through
...
- ZK
...
- watchers)
...
- and
...
- group
...
- consumer
...
- (through
...
- ping
...
- request)
...
- changes
...
- Communicate
...
- the
...
- rebalance
...
- results
...
- to
...
- consumers
...
- through
...
- the
...
- socket
...
- channels
...
When
...
a
...
coordinator
...
decides
...
a
...
rebalance
...
is
...
needed
...
for
...
certain
...
group,
...
it
...
will
...
first
...
sends
...
the
...
stop-fetcher
...
command
...
to
...
each
...
consumers
...
in
...
the
...
group,
...
and
...
then
...
sends
...
the
...
start-fetcher
...
command
...
to
...
each
...
consumers
...
with
...
the
...
assigned
...
partitions.
...
Each
...
consumers
...
will
...
only
...
receive
...
the
...
partition
...
info
...
of
...
the
...
partitions
...
that
...
are
...
assigned
...
to
...
itself.
...
The
...
coordinator
...
will
...
finish
...
the
...
rebalance
...
by
...
waiting
...
for
...
all
...
the
...
consumers
...
to
...
finish
...
starting
...
the
...
fetchers
...
and
...
respond.
...
The
...
consumer,
...
upon
...
startup,
...
will
...
consult
...
known
...
brokers
...
for
...
the
...
current
...
coordinator.
...
The
...
known
...
broker
...
list
...
is
...
put
...
in
...
the
...
consumer
...
properties
...
file.
...
Then
...
the
...
consumer
...
will
...
try
...
to
...
create
...
a
...
socket
...
channel
...
to
...
the
...
coordinator,
...
and
...
once
...
accepted,
...
it
...
will
...
keep
...
trying
...
to
...
read
...
new
...
requests
...
from
...
the
...
coordinator
...
and
...
respond,
...
but
...
never
...
pro-actively
...
send
...
requests
...
to
...
the
...
coordinator.
...
When
...
the
...
consumer
...
does
...
not
...
receive
...
any
...
request
...
within
...
a
...
configurable
...
amount
...
of
...
time,
...
it
...
will
...
treat
...
it
...
as
...
the
...
connection
...
has
...
lost
...
and
...
try
...
to
...
reconnect
...
to
...
the
...
possibly
...
new
...
coordinator
...
by
...
restarting
...
the
...
consulting
...
process
...
again.
Paths Stored in ZooKeeper
Most of the original ZK paths storage are kept, in addition to the coordinator path (stores the current coordinator info):
Code Block |
---|
h3. Paths Stored in ZooKeeper Most of the original ZK paths storage are kept, in addition to the *coordinator path* (stores the current coordinator info): {code} /consumers/coordinator --> broker_id (ephemeral; created by coordinator) {code} |
Besides,
...
some
...
of
...
the
...
original
...
ZK
...
paths
...
are
...
removed,
...
including:
Code Block |
---|
} /consumers/groups/ids {code} |
And
...
some
...
of
...
the
...
ephemeral
...
ZK
...
paths
...
are
...
changed
...
to
...
persistent:
Code Block |
---|
}
/consumers/groups/owners
|
On Coordinator Startup
Every server will create an coordinator instance as its member upon startup. The consumer coordinator keeps the following fields (all the request usage and formats will be introduced later in this page):
Code Block |
---|
{code} h3. On Coordinator Startup Every server will create an coordinator instance as its member upon startup. The consumer coordinator keeps the following fields (all the request usage and formats will be introduced later in this page): {code} coordinatorElector : ZkElection // A ZK based elector using the coordinator path mentioned above groupsBeingRebalanced : Map[String, AtomicBoolean] // For each group, a bit indicating if the group is under rebalancing consumerGroupsPerTopic : Map[String, Set[String]] // For each topic, the consumer groups that are interested in the topic groupsWithWildcardTopics : Set[String] // Groups that has wildcard interests for topics rebalanceRequestQ : List[BlockingQueue[String]] // A blocking queue storing all the rebalance requests, the request just contain the group name requestHandler : List[RebalanceRequestHandler] // A list of threads handling all the rebalance requests read from the rebalanceRequest socketServer : SocketServer // A socket server for maintaining socket channels with all consumers and write Stop/StartRequest and read Ping/StartRequest // It contains an acceptor and a list of processors, and for each processor it contain a request queue pingScheduler : KafkaDelayedScheduler // A scheduled thread which maintains and processes a timestamp based priority queue of ping request {code} |
The
...
coordinator's
...
construction
...
function
...
will
...
only
...
initialize
...
the
...
coordinatorElector
...
by
...
passing
...
a
...
callback
...
function
...
called
...
coordinatorStartup
...
.
...
The
...
coordinatorElector
...
,
...
upon
...
initialization,
...
will
...
immediately
...
try
...
to
...
become
...
the
...
leader.
...
If
...
someone
...
else
...
has
...
become
...
the
...
leader,
...
it
...
will
...
listen
...
to
...
the
...
coordinator
...
path
...
for
...
data
...
change,
...
and
...
try
...
to
...
re-elect
...
whenever
...
the
...
current
...
elector
...
resigns
...
(i.e.
...
the
...
data
...
on
...
the
...
path
...
is
...
deleted).
...
Whenever
...
it
...
elects
...
to
...
become
...
the
...
leader,
...
it
...
will
...
trigger
...
the
...
callback
...
function
...
that
...
is
...
provided
...
by
...
its
...
caller,
...
i.e.
...
the
...
coordinator.
Code Block |
---|
} coordinatorStartup : 1. Read all the topics from ZK and initialize consumerGroupsPerTopic 2. Read all the consumer groups from ZK 2.1 Get the current interested topics of each group, update consumerGroupsPerTopic by adding the group to each topic's interested group list 2.2 If the group has some consumer specifying wildcard topic-counts, then add the group to groupsWithWildcardTopics 2.3 Always try to rebalance every group by adding (group -> new AtomicBoolean(true)) to groupsBeingRebalanced and put group to rebalanceRequestQ 3. Register listeners for topics and their partition changes 3.1 Subscribe TopicChangeListener to /brokers/topics 3.2 Subscribe TopicPartitionChangeListener to each /brokers/topics/[topic] 4. Start up socketChannel and pingScheduler 5. Register session expiration listener 6. Initialize and start the requestHandler threads {code} |
When
...
a
...
processor
...
in
...
the
...
socketServer
...
sends
...
a
...
StopRequest
...
to
...
a
...
consumer's
...
channel
...
to
...
inform
...
it
...
to
...
stop
...
consuming
...
and
...
wait
...
for
...
the
...
new
...
assigned
...
partitions
...
due
...
to
...
rebalance,
...
it
...
needs
...
to
...
initializes
...
a
...
expire
...
watcher
...
for
...
the
...
request.
...
When
...
it
...
received
...
the
...
corresponding
...
StartResponse
...
from
...
the
...
channel,
...
it
...
needs
...
to
...
clear
...
the
...
watcher.
...
For
...
a
...
detailed
...
description
...
of
...
the
...
request
...
expire/satisfy
...
purgatory,
...
please
...
read
...
...
.
The pingScheduler is used to send PingRequests to all consumers checking their liveness based on the consumers' ping_interval_ms.
Code Block |
---|
} KafkaDelayedScheduler.run() : While isRunning 1. Peek the head consumer from the priority queue 2. If the consumer.scheduledTime >= current_time() try to send the PingRequest 2.1 If the consumer's channel is not held by the socketServer's processor for rebalance, sends the PingRequest and set the timeout watcher for the consumer 2.2 Otherwise do nothing 3. Remove the consumer from the head of the queue and put the consumer with consumer.scheduledTime += consumer.ping_interval_ms back to the queue {code} |
For
...
a
...
detailed
...
design
...
of
...
the
...
scheduled
...
thread,
...
please
...
read
...
here.
On ZK Watcher Fires For Topic Partition Changes
When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalance.
Handle Topic Change
Code Block |
---|
|https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Delayed+Scheduling+Problem+%28moved+to+script%29].
h3. On ZK Watcher Fires For Topic Partition Changes
When the ZK wachers are fired notifying topic partition changes, the coordinator needs to decide which consumer groups are affected by this change and hence need rebalance.
h5. Handle Topic Change
{code}
TopicChangeListener.handleChildChange :
1. Get the newly added topic (since /brokers/topics are persistent nodes, no topics should be deleted even if there is no consumers any more inside the group)
2. For each newly added topic:
2.1 Subscribe TopicPartitionChangeListener to /brokers/topics/topic
2.2 Get the set of groups that are interested in this topic from both consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group*
* By trying to request rebalance, we do the following:
if (groupsBeingRebalanced(group).compareAndSet(false, true)) rebalanceRequestQ[group % requestHandler.size].put(group)
|
Handle Topic Partition Change
Code Block |
---|
{code} h5. Handle Topic Partition Change {code} TopicPartitionChangeListener.handleChildChange : Get the set of groups that are interested in this topic from consumerGroupsPerTopic(topic) and groupsWithWildcardTopics (filtered by wildcard pattern regex), and try to request rebalance for each group {code} h3. On Consumer Startup Upon creation, |
On Consumer Startup
Upon creation, the consumer will get a list of
Code Block |
---|
the consumer will get a list of {code} {brokerId : (host, port)} {code} |
It
...
can
...
then
...
consult
...
to
...
any
...
one
...
of
...
the
...
known
...
brokers
...
to
...
get
...
the
...
full
...
list
...
of
...
current
...
brokers
...
and
...
the
...
ID
...
of
...
the
...
coordinator.
...
Once
...
the
...
consumer
...
finds
...
out
...
the
...
address
...
of
...
the
...
coordinator,
...
it
...
will
...
try
...
to
...
connect
...
to
...
the
...
coordinator.
...
When
...
the
...
connection
...
is
...
set
...
up,
...
it
...
will
...
send
...
the
...
only
...
one
...
RegisterRequest
...
to
...
the
...
coordinator.
...
After
...
that,
...
the
...
consumer
...
will
...
keep
...
trying
...
to
...
read
...
new
...
request
...
from
...
the
...
coordinator.
...
Hence
...
the
...
consumer
...
does
...
not
...
need
...
to
...
maintain
...
a
...
socket
...
server
...
but
...
just
...
a
...
SocketChannel.
Code Block |
---|
{code}
consumerStartup (initBrokers : Map[Int, (String, String)]):
1. Randomly pick a broker in the initBrokers, create a socket channel with that broker
1.1. If the socket channel cannot be created, try another broker in the initBroker
1.2. If all the brokers in initBroker cannot be connected, throw a AllKnownBrokersNotAvailable exception and return
2. Send a ConsultRequest request to the broker and get a ConsultResponse from the broker
3. From the ConsultResponse update serverCluster and curCoordinator
4. Set up a socket channel with the current coordinator and send a RegisterRequest
5. Keep block-reading from the channel
|
ConsultRequest
The ConsultRequest sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.
Note that in this design we are still considering the format of 0.7, although new wire format has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.
Code Block |
---|
{code}
h5. ConsultRequest
The ConsultRequest sent by the consumer can be received by any broker, and is handled by the broker's socketServer. Hence its request format should be compatible with the ProduceRequest and FetchRequest.
Note that in this design we are still considering the format of 0.7, although [new wire format|https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal] has already been used in 0.8. Our implementation will keeps this in mind and make it easy to adapt to new format.
{code}
{
size: int32 // the size of this request, not including the first 4 bytes for this field
request_type_id: int16 // the request type id, distinguish Fetch/Produce/Offset/Consult/etc requests
// do not need any data for the consult request
}
|
ConsultResponse
Code Block |
---|
{code} h5. ConsultResponse {code} { size: int32 // the size of this response, not including the first 4 bytes for this field error_code: int16 // global error code for this request, if any coordinator_id: int16 // broker id of the coordinator brokers_info: [<broker_struct>] // current broker list } broker_struct => { creator_id: string id: int32 host: string port: int32 } {code} h5. RegisterRequest The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first Stop or the Ping request within |
RegisterRequest
The consumer does not need a response for the RegisterRequest, since once it sends out the request, it will start to expect for the first Stop or the Ping request within ping_interval_ms.
Code Block |
---|
} { size: int32 // the size of this request request_type_id: int16 // the type of the request, currently only one type is allowed: ConnectRequest group_id: string // group that the consumer belongs to consumer_id: string // consumer id topic_count: string // interested topic and number of streams, can be wildcard auto_commit: boolean // indicator of whether autocommit is enabled auto_offset_rest: string // indicator of what to do if an offset is out of range, currently either smallest or largest ping_interval_ms: int32 // ping interval in milliseconds, and the consumer is expected to response within the interval also max_ping_retries: int16 // maximum number of allowed ping timeouts before the consumer to be treated as failed } {code} h3. On Consumer |
On Consumer Add/Delete
...
When
...
a
...
new
...
connection
...
is
...
received
...
from
...
a
...
consumer
...
by
...
the
...
socketServer
...
of
...
the
...
coordinator,
...
it
...
will
...
wait
...
for
...
its
...
RegisterRequest.
...
Once
...
the
...
request
...
is
...
received,
...
the
...
coordinator
...
will
...
issue
...
a
...
rebalance
...
request
...
for
...
the
...
group
...
of
...
the
...
newly
...
added
...
consumer.
...
When
...
a
...
new
...
group
...
id
...
is
...
seen
...
by
...
the
...
coordinator
...
(hence
...
this
...
group
...
does
...
not
...
exist
...
yet),
...
it
...
needs
...
to
...
handle
...
the
...
newly
...
added
...
group.
...
When
...
the
...
consumer
...
is
...
failed,
...
eventually
...
the
...
coordinator
...
will
...
know
...
its
...
failure
...
through
...
timeout
...
of
...
PingRequest.
...
Then
...
it
...
will
...
issue
...
a
...
rebalance
...
request
...
for
...
the
...
group
...
of
...
the
...
failed
...
consumer.
...
Handle
...
New
...
Group
Code Block |
---|
} handleNewGroup : 1. Read all the topics this group is interested from topic_count, for each topic: 1.1. If the topic already exists in consumerGroupsPerTopic, update its list by adding this group 1.2. If the topic is not in consumerGroupsPerTopic yet, add the entry (topic -> Set(group)) 2. If the interested topics are in the wildcard form, add this group to groupsWithWildcardTopics 3.1. If the group already has some interested existed topics, put (group -> new AtomicBoolean(true)) to groupsUnderRebalance, and put the group to rebalanceRequestQ[group % requestHandler.size] 3.2. Otherwise just put (group -> new AtomicBoolean(false)) to groupsUnderRebalance {code} h5. Handle Group Member |
Handle Group Member Change
Note that here we assume that all the consumers in the same group have the same set of interested groups, so adding/deleting consumers from the group will not change the group's interests. We think it is a fair assumption: though in current implementation consumers within a group can have different interested group, users seldomly do that.
Code Block |
---|
Change Note that here we assume that all the consumers in the same group have the same set of interested groups, so adding/deleting consumers from the group will not change the group's interests. We think it is a fair assumption: though in current implementation consumers within a group can have different interested group, users seldomly do that. {code} handleGroupMemberChange : 1. Decide the deleted consumer or the newly added consumer 2.1. If the group no longer contain any consumer, do nothing 2.2. Otherwise if groupsBeingRebalanced(group).compareAndSet(false, true) succeeds, put the group to rebalanceRequestQ[group % requestHandler.size]. {code} h3. On Rebalancing The requestHandler threads keep |
On Rebalancing
The requestHandler threads keep block-reading
...
from
...
their
...
corresponding
...
rebalanceRequestQ,
...
each
...
will
...
handle
...
the
...
rebalancing
...
task
...
for
...
a
...
non-overlapping
...
subset
...
of
...
groups
...
(e.g.
...
through
...
mod
...
hashing).
...
For
...
each
...
rebalance
...
request
...
for
...
a
...
specific
...
group
...
it
...
calls
...
the
...
rebalance
...
function.
...
If
...
the
...
rebalance
...
succeeds
...
it
...
will
...
reset
...
groupsBeingRebalanced
...
(group);
...
otherwise
...
it
...
will
...
retry
...
rebalance
...
again.
...
If
...
the
...
handler
...
cannot
...
finish
...
rebalance
...
successfully
...
with
...
config
...
.
...
maxRebalanceRetries
...
retries,
...
it
...
will
...
throw
...
a
...
ConsumerRebalanceFailedException
...
.
Code Block |
---|
} rebalance (group) : 1. Get the topics that are interested by the group. For each topic: 1.1. Get the number of partitions by reading from ZK 1.2. Get the number of threads for each topic from the alive consumers connecting to it 1.3. Compute the new ownership assignment for the topic 3. Check if a rebalance is necessary by trying to get the current ownership from ZK for each topic. 3.1 If there is no registered ownership info in ZK, rebalance is necessary 3.2 If some partitions are not owned by any threads, rebalance is necessary 3.3 If some partitions registered in the ownership map do not exist any longer, rebalance is necessary 3.4 If ownership map do not match with the new one, rebalance is necessary 3.5 Otherwise rebalance is not necessary 4. If rebalance is necessary, do the following: 4.1 For each consumer in the group, send the StopRequest to the socketServer's corresponding processor's queue 4.2 Then for each consumer in the group, send the StartRequest with he newly computed ownership specific to the consumer to the socketServer's corresponding processor's queue 4.3 Then wait until socketServer has reported that all the StartReponse have been received 5. If waiting has timed out, return false; otherwise write the new ownership info to ZK and return true. {code} h3. On Coordinator Failover Whenever the current |
On Coordinator Failover
Whenever the current coordinator's
...
hosted
...
server
...
dies,
...
other
...
coordinator's
...
elector
...
will
...
realize
...
that
...
through
...
the
...
ZK
...
listener
...
and
...
will
...
try
...
to
...
re-elect
...
to
...
be
...
the
...
leader,
...
and
...
whoever
...
wins
...
will
...
trigger
...
the
...
callback
...
function
...
coordinatorStartup
...
.
...
When
...
the
...
dead
...
server
...
comes
...
back,
...
the
...
zkClient
...
will
...
atomically
...
reconnect
...
to
...
it
...
and
...
trigger
...
the
...
handleNewSession
...
function.
Code Block |
---|
}
handleNewSession :
1. Reset its state by clearing consumerGroupsPerTopic, groupsWithWildcardTopics and rebalanceRequestQ, etc
2. Re-register the session expiration listener (this is because ZkClient does not re-register itself once fired)
3. Try to re-elect to be the coordinator by directly calling the elect function of its coordinatorElector.
|
On Consumer Handling Coordinator Request
There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms
PingRequest
Coordinator use PingRequest to check if the consumer is still alive, and consumer need to response with the current offset for each consuming partition if auto_commit is set.
Code Block |
---|
{code} h3. On Consumer Handling Coordinator Request There are three types of requests that a consumer can receive from the coordinator: PingRequest, StopRequest and StartRequest. For each type of requests the consumer is expected to response within ping_interval_ms h5. PingRequest Coordinator use PingRequest to check if the consumer is still alive, and consumer need to response with the current offset for each consuming partition if auto_commit is set. {code} { size: int32 // the size of this request request_type_id: int16 // the type of the request, distinguish Ping/Stop/Start requests num_retries: int16 // indicate this is the #th ping retry } |
PingResponse
Code Block |
---|
} {code} h5. PingResponse {code} { size: int32 // the size of this response num_partition: int16 // number of partitions that the consumer is consuming offset_info: [<offset_struct>] // the offset info for each partition consumed by this consumer, its size should be exactly num_partition // if auto_commit is set to false the last two fields should not exist } offset_struct => { topic: string partition: string offset: int64 } {code} h5. Handling PingRequest {code} |
Handling PingRequest
Code Block |
---|
handlePingRequest (request: PingRequest):
1.1. If auto_commit is not enabled, just sends a PingResponse with no num_partition and offset_info
1.2. Otherwise read topicRegistry and sends a PingResponse with the created num_partitions and offset_info
{code}
h5. StopRequest
When the coordinator decides to rebalance a |
StopRequest
When the coordinator decides to rebalance a group,it
...
will
...
first
...
send
...
StopRequest
...
to
...
every
...
consumer
...
in
...
the
...
group
...
to
...
let
...
them
...
stop
...
consuming.
Code Block |
---|
} { size: int32 // the size of this request request_type_id: int16 // the type of the request, distinguish Ping/Stop/Start requests } {code} h5. Handling StopRequest The consumer does not need to response to the StartRequest, since the coordinator only needs to synchronize with consumers after StartRequest are sent. {code} |
Handling StopRequest
The consumer does not need to response to the StartRequest, since the coordinator only needs to synchronize with consumers after StartRequest are sent.
Code Block |
---|
handleStopRequest (request: StopRequest): 1.1. If fetcherStopped is true then do nothing 1.2. Otherwise close all fetchers and clean their corresponding queues, set fetcherStopped to true |
StartRequest
The coordinator will send every consumer in a group StartRequest with the owned partition info after the StopRequest.
Code Block |
---|
{code}
h5. StartRequest
The coordinator will send every consumer in a group StartRequest with the owned partition info after the StopRequest.
{code}
{
size: int32 // the size of this request
request_type_id: int16 // the type of the request, distinguish Ping/Stop/Start requests
num_partitions: int16 // the number of partitions assigned to the consumer
assigned_parts: [<partition_struct>] // the detailed info of assigned partitions, along with the starting offset
}
partition_struct =>
{
topic: string
partition: string // note that this partition string already contains the broker id
offset: int64
}
|
StartResponse
Code Block |
---|
{code} h5. StartResponse {code} { size: int32 // the size of this response error_code: int16 // global error code for this request, if any } {code} h5. Handling StartRequest {code} |
Handling StartRequest
Code Block |
---|
handleStartRequest (request: StartRequest):
1. If the consumer's topic interests are wildcard, re-construct topicThreadIdAndQueues and KafkaMessageAndMetadataStreams
2. Read the assigned partitions along with offsets from assigned_parts
3. If fetcherStopped is false and the newly assigned partition map does not match topicRegistry, throw IllegalStateException
4. Update topicRegistry with the newly assigned partition map
5. Start fetchers
6. Set fetcherStopped to false
{code}
h3. Open Problems
# When all the brokers listed in the properties file as known brokers are gone when a consumer |
Open Problems
- When all the brokers listed in the properties file as known brokers are gone when a consumer starts/resumes,
...
- the
...
- consumer
...
- cannot
...
- find
...
- the
...
- coordinator
...
- and
...
- thus
...
- cannot
...
- be
...
- added
...
- to
...
- the
...
- group
...
- to
...
- start
...
- consuming.
...
- This
...
- rare
...
- case
...
- should
...
- be
...
- treated
...
- as
...
- an
...
- operational
...
- error
...
- since
...
- the
...
- migration
...
- of
...
- broker
...
- cluster
...
- should
...
- be
...
- incremental
...
- and
...
- adapt
...
- to
...
- consumer
...
- properties
...
- file.
...
- The
...
- rebalance
...
- thread
...
- pool
...
- and
...
- the
...
- socketServer's
...
- processor
...
- thread
...
- pool's
...
- sizes
...
- must
...
- be
...
- pre-specified,
...
- however
...
- the
...
- size
...
- of
...
- the
...
- consumers
...
- can
...
- be
...
- scaled
...
- during
...
- the
...
- operation.
...
- Hence
...
- more
...
- and
...
- more
...
- consumers
...
- must
...
- be
...
- handled
...
- by
...
- each
...
- thread
...
- as
...
- new
...
- consumers
...
- are
...
- added,
...
- which
...
- will
...
- increase
...
- the
...
- CPU
...
- burden.
...
- Since
...
- consumers
...
- no
...
- longer
...
- register
...
- themselves
...
- in
...
- Zookeeper,
...
- when
...
- a
...
- new
...
- coordinator
...
- stands
...
- up,
...
- it
...
- needs
...
- to
...
- wait
...
- for
...
- all
...
- the
...
- consumer
...
- to
...
- re-connect
...
- to
...
- it
...
- instead
...
- of
...
- reading
...
- the
...
- consumer
...
- info
...
- from
...
- the
...
- ZK,
...
- this
...
- may
...
- increase
...
- the
...
- latency
...
- of
...
- coordinator
...
- failover
...
- process
Possible Improvements
- Batch Updating ZK Offset: when received the PingResponse with the current offset info, the processor does not need to immediately reflect that to ZK but just update an in-memory {(topic, partition) -> offset} map; then another daemon thread can periodically reflect the updated map to ZK. With newer version of ZK that supports group writes, it can reduce the number of writes to ZK.