Table of Contents |
---|
Public-Facing Changes
Update Java Docs
API signatures will remain exactly the same
API returned exceptions can be a little bit different. See See wakeupException() section.
...
Code complexity: the current rebalance protocol is quite heavy on the client side. And And over the years, patches and hotfixes have impacted the readability of the code. For For example, coordinator communication can happen at different places, which makes it challenging to understand the code path. There There are also lots of small patches to handle bugs and edge cases that might require more fixes.
Complex Coordinator Communication: As the polling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and causes concurrent issues.
Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to the poll. Firstly Firstly, it simplifies the polling thread design, as all the coordinator communication will be moved to the background. Secondly Secondly, it prevents consumer poll blocking other consumer operations such as network requests.
...
We will be moving network communication, including heartbeat, to the background thread, which allows the consumer to operate more asynchronously. In In particular, the scope of this will be as such:
Deprecate HeartbeatThread
Refactor the Coordinator classes, i.e., AbstractCoordinator and ConsumerCoordinator, to adapt to this asynchronous design.
Refactor the KafkaConsumer API internals to adopt this asynchronous design.
Introduce events and communication channels to facilitate communication between the polling and background threads.
Definitions
Polling Thread
...
The active process in the background handles network communication, including coordinator communication and heartbeat. This This article will provide more implementation details.
...
A note on the subscriptionState: Its reference will be shared by both the polling and background threads.
...
The coordinator is automatically discovered (when requested) in our implementation. Therefore Therefore, we use a state machine to represent the different states of the coordinator connection and perform rediscovery in case of disconnection, as long as the background thread isn't isn’t terminated. Here Here, the background will can be in 4 different states, one of the four states: down, initialized, discovering (coordinator discovery), and stable.
...
Upon initialization of the consumer, the coordinator is down
The polling thread starts the background thread; the background thread moves to the initialized state
The background thread loop:
Poll for the new events (for example, a commit event) from the queue, if any
Check the background thread state (connected to a coordinator or not)
If the event requires a coordinator. Moves Moves the coordinator to the discovery (coordinator discovery) state.
If not, execute the event.
Check the coordinator connection. If If the FindCoordinator request hasn't hasn’t been completed, stay in the discovery state. If If the request fails, transition to the initialized state. Otherwise Otherwise, the coordinator is found and transition to the stable state.
Poll ConsumerCoordinator
Poll networkClient
Go to 3
If close() is received:
set close to true to that the loop and exit
poll coordinator and network client
Commit
Consumer Poll Changes
Currently, the consumer.poll() does two things: poll coordinator for assignment update, auto-commit and rebalances, and fetching.
...
Refactor the HeartbeatThread out of the existing implementation. Therefore Therefore, we won't won’t be starting the heartbeatThread heartbeat thread but having the send heartbeat mechanism piggyback onto the background thread.
Right now, the coordinator states Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because Because of the complexity of the rebalance callback execution in the new implementation, we will:
Add new states for partition assignment and revocation:
Add the “REVOKE_PARTITION” state , to handle onPartitionRevoke
Add “ASSIGN_PARTITION” state to handle take onPartitionAssigned
These "loop “loop until done or timeout" timeout” mechanisms will piggyback on the background thread loop and take advantage of the state machine to advance the progress:
Background thread will automatically re-discover the coordinator if disconnected, using the state machine
We are already doing networkclinet.poll() in the background thread loop
...
The existing coordinator implementation already has a state machine in place. We We will add four states to facilitate the callback execution.
After onJoinPrepare, transition to the REVOKING_PARTITIONS state. It It will do a few things:
Send an event to the polling thread to execute the event
Wait for the partition revoked event, and advance the state to PARTITIONS_REVOKED
In the onJoinComplete:
we We will first exist COMPLETING_JOINstate
Enter ASSIGNING_PARTITIONS, and send a partition assignment event. Return.
Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED
...
Wakeup() and WakeupException
TBD. I’m not sure how WakeupException plays out in the new design.
...
Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed
...
We need to make sure the timing of the 1. coordinator discovery and 2. joinGroup operations are being done in the correct timing.
We will also ensure the heartbeat interval, and poll interval are respected.
We also need to make sure there's ensure there’s no event lost, and the event should happen in the correct sequence.
...
Fetcher
Should we consider a larger or configurable prefetch buffer? Ideally, we can tune this param to improve the performance.
SubscriptionState
Shared: For the implementation simplicity, the subscriptionState will be shared between the background and polling thread
Non-shared: We can make a copy of the subscriptionState in the background thread, and 1. update the state, and 2. check for consistency during the poll
2 Channel Implementation
We really only use the second queue (ConsumerChannel) for two purposes (see below), so instead we could just hold a reference of those and check them during the poll. (for simplicity)
Partition Revocation
Error
Others
Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take(). Each take() performs similar action as poll().
...