...
Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code. Firstly, it makes bug fixes increasingly difficult, and secondly, it makes the code hard to comprehend. For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers.
Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has caused some race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.
Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to the poll. Firstly, it simplifies the polling thread design, as it essentially only needs to handle fetch requests. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalance process.
Scope
Deprecate HeartbeatThread
- Remove the heartbeat thread.
- Move the heartbeat logic to the background thread.
Refactor the Coordinator classes
- Revisit timers and loops
- Remove some blocking methods, such as commitOffsetSync. The blocking commit should be handled by the polling thread, waiting for the background thread to complete the future.
- The coordinator will be polled by the background thread loop.
- Rebalance state modification: We will add a few states to ensure the rebalance callbacks are executed in the correct sequence and time.
Refactor the KafkaConsumer API
- It will send Events to the background thread if network communication is required.
- Remove dependency on the fetcher, coordinator, and networkClient.
Events and communication channels.
- We will use two channels the facilitate the two-way communication
Address issues in these Jira tickets
...
Terms
Polling Thread
The client thread, also known as the polling thread, is where the thread user invokes the poll() operation.
...
In the schematic below, the main components are:
Polling thread
...
: handles all of the API requests and
...
callback executions.
Background thread
...
: handles network communication such as heartbeat
...
, coordinator requests, and rebalance flow execution.
Communication channels
...
: which are responsible for:
sending events to the background thread
sending events to the polling thread
A note on the subscriptionState: Its reference will be shared by polling and background threads.
...
Background thread and its lifecycle
The coordinator is automatically discovered (when requested) in our implementation. Therefore, discovery process piggybacks on the background thread loop; therefore, any disconnect or connection request is handled by the state machine. Here we use a state machine to represent the different states of the coordinator connection and perform rediscovery in case of disconnection, as long as the background thread isn’t terminated. Here, the background can be in one of the four states: down, initialized, discovering coordinator_discovery (coordinator discovery), and stable.
Background thread and its lifecycle
...
The background thread needs to fulfill certain requirements in each state to maintain its status. In particular:
- down: The coordinator is uninitialized
- initialized: The background thread completes initialization, and the loop has started.
- coordinator_discovery: The coordinator is requested but hasn't been connected yet. If there's no in-flight FindCoordinatorRequest, then it will send one. If one exists, check for the request results.
- stable: The coordinator is connected.
Background thread and its lifecycle
The background thread has not been constructed, so it is down.Upon initialization of the consumer, the coordinator is down
The polling thread starts the background thread; the background thread moves to the initialized state. Usually happens after new KafkaConsumer().
The background thread loop is running. Here are the things that are happening:
Poll for the new events (for example, a commit event) from the queue, if anychannel
Check the background thread state (connected to a coordinator or not)state by running the state machine. Ensure state requirements are fulfilled.
If the event requires a coordinator. Moves the
background thread to the coordinator_discovery (coordinator discovery) state.
If not, stays initialized and execute the event.
Check the coordinator connection.
If the FindCoordinator request hasn’t been completed, stay in the discovery state.
If the request fails, transition to the initialized state.
Otherwise, the coordinator is found
. Transition to the stable state.
Poll ConsumerCoordinator
Poll networkClient
Go to 3
If close() is received:
set close to true to that the loop and exit
poll coordinator and network client
Commit
...