...
Update Java Docs
API signatures will remain exactly the same
API returned exception exceptions can be a little bit differencedifferent. See See wakeupException() section.
...
This document highlights our effort to refactor the threading model of the KafkaConsumer, and the purpose of this refactor is to address issues and shortcomings we’ve encountered in the past years, such as increasing code complexity from the hotfixes and patches, lock and race conditions caused by both heartbeatThread heartbeat thread and the polling thread making coordinator requests, and the coupling between the client poll and the rebalance progress.
Code complexity: the current rebalance protocol is quite heavy on the client side. And And over the years, the patches have gotten increasingly complicated, and the readability has been impacted. For and hotfixes have impacted the readability of the code. For example, coordinator communication can happen at different places, that which makes it difficult challenging to understand the code path. There There are also lots of small patches to handle bugs and edge cases that might further down require more fixes.
Complex Coordinator Communication: As the poling polling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and caused causes concurrent issues.
Asynchronous Background Thread: One of our goal goals here is to make rebalance process to happen asynchronously to the poll. Firstly Firstly, it simplifies the polling thread design, as all of the coordinator communication will be moved to the background. Secondly Secondly, it prevents polls from consumer poll blocking other consumer operations such as network requests.
...
We will be moving network communication, including heartbeat to the background thread, which allows the consumer to operate in a more asynchronous fashionasynchronously. In In particular, the scope of this refactor will be as such:
Deprecate HeartbeatThread
Refactor the Coordinator classes, i.e., AbstractCoordinator and ConsumerCoordinator, to adapt to the this asynchronous design.
Refactor the KafkaConsumer API internals , to adopt to this asynchronous design.
Introduce events and communication channels , to facilitate the communication between the polling thread and the background threadthreads.
Address issues in these jiras tickets
Definitions
Polling Thread
The main user thread. It is called polling thread because it client thread, also known as the polling thread, is where the thread user invokes the poll() operation.
Background Thread
The thread that is running active process in the background upon initialization of the KafkaConsumer.handles network communication, including coordinator communication and heartbeat. This article will provide more implementation details.
Heartbeat Thread
The background running thread in the current implementation (see KIP-62).
Event
The main primary communication medium for the polling thread is to interact with the background thread.
...
Polling thread, which handles all of the API requests and cb execution.
Background thread, which handles network communication such as heartbeat and coordinator requests , and rebalance flow
Communication channels, what which are responsible for:
sending events to the background thread
sending events to the polling thread
A note on the subscriptionState, here, it will be the only exception in this design, that is sharing references between the polling thread and the background thread: Its reference will be shared by both the polling and background threads.
Important Components
Background thread and its lifecycle
In our implementation, it is important for the background thread to discover coordinator (when needed) automatically. ThereforeThe coordinator is automatically discovered (when requested) in our implementation. Therefore, we use a state machine to represent the different states of the coordinator connection , and allows rejoin after and perform rediscovery in case of disconnection, as long as the background thread isn’t isn't terminated. Here Here, the background will be in 4 different states, down, initialized, discovering (coordinator discovery), and stableWe thread’s lifecycle can be represented this way.
Background thread and its lifecycle
Upon initialization of the consumer, the coordinator is down
Polling The polling thread starts the background thread, ; the background thread moves to the initialized state
The background thread loop:
Poll for the new events (for example, a commit event) from the queue, if any
Check the background thread state (connected to a coordinator or not)
If the event requires a coordinator. Moves Moves the coordinator to the discovery (coordinator discovery) state.
If not, execute the eventIf the .
Check the coordinator connection. If the FindCoordinator request hasn't been completed, stay in the discovery state. If the request fails, transition to the initialized state. Otherwise, the coordinator is found and transition to the stable state state is currently in discovery state, check coordinator connection. Either move to initialized if the discovery failed. Or stable if coordinator is discovered.
Poll ConsumerCoordinator
Poll networkClient
Go to 3
If close() is received:
set close to true to that the loop and exit
poll coordinator and network client
Commit
Consumer Poll Changes
Currently, consumer.poll() does 2 two things: poll coordinator for assignment update, auto-commit , and possibly running the rebalance processrebalances, and fetching.
Moving forward, the consumer will only be responsible for sending and returning the fetch data , and poll polling the channel for events such as rebalancing and error:
Poll the ConsumerQueue for events
Handle exceptions
Handle callback execution
Send out fetch requests
Returning fetch results
...
Refactor the HeartbeatThread out of the existing implementation. Therefore Therefore, we won’t won't be starting the heartbeat thread, heartbeatThread but having the send heartbeat mechanism piggy back piggyback onto the background thread.
Right now, the Coordinator state consist of: coordinator states Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because Because of the complexity of the rebalance callback and background thread added to execution in the new implementation, we will:
Add new states for partition assignment and revocation:
Add “REVOKE_PARTITION” state, to handle onPartitionRevoke
Add “ASSIGN_PARTITION” state to handle onPartitionAssigned
These “loop "loop until done or timeout” timeout" mechanisms will piggy back piggyback on the background thread loop , and take the advantage of the state machine to advance the progress:
Background thread will automatically re-discover the coordinator if disconnectdisconnected, using the state machine
We are already doing networkclinet.poll() in the background thread loop
Rebalance States
The existing coordinator impl implementation already has a state machine in place. We We will add 4 four states , to facilitate the callback execution, and modify the existing states.
After onJoinPrepare, transition to the REVOKING_PARTITIONS state. It It will do a few things:
Send an event to the polling thread to execute the event
Wait for the partition revoked event, and advance the state to PARTITIONS_REVOKED
In the onJoinComplete:
we will first existing exist COMPLETING_JOINstate
Enter ASSIGNING_PARTITIONS, and send a partition assignment event. Return.
Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED
...
TBD. I’m not sure how WakeupException plays out in the new design.
Timeout Policy
Consumer.poll() - user provide timeout
...
Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed
CommitOffsetSync: user provided
Are Is there a better way to configure session interval and heartbeat interval?
...
We will also ensure the heartbeat interval, and poll interval are respected.
We also need to make sure there's no event lost, and the event should happen in the correct sequence.
...
Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take(). Each take() performs similar action as poll().
User Adoption
The refactor should have (almost) no regression or breaking changes upon merge. So user should be able to continue using the new client.