Table of Contents |
---|
Public-Facing Changes
Update Java Docs
API signatures will remain the same
Summary
This document highlights our effort to refactor the threading model of the KafkaConsumer, and the purpose of this refactor is current KafkaConsumer. This project aims to address issues and shortcomings we've encountered in the past years, such as increasing code complexity , lock and race conditions, and the coupling between the client poll and the rebalance progressand readability, concurrency bugs, rebalancing issues, and, lastly, the enable KIP-848.
Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code. The complex code path and intertwined logic make the code difficult to modify and comprehend. For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers.
Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has , caused some race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.
Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to with the poll. Firstly, it simplifies the pd design, as it essentially only needs to handle fetch requestsdesign and logic because the timing of the network request is more predictable. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalance rebalancing process.
Scope
- Define the responsibilities of the polling thread and the background thread
- Define the communication interface and protocol between the two threads
- Define a Network layer over the NetworkClient, including how different requests are sent and handled
- Define RequestManagers
- Define RequestStates to handle retry backoff
- Redesign SubscriptionState
- Deprecate HeartbeatThread and move it to the background thread
- Define rebalance flow, including how JoinGroup and SyncGroup request is sent and how callbacks are triggered
Refactor the KafkaConsumer API innards
Deprecate HeartbeatThread
- Remove the heartbeat thread.
- Move the heartbeat logic to the background thread.
- Implement a background thread
Refactor the Coordinator classes
- Revisit timers and loops
- Blocking methods such as commitOffsetSync will be handled by the poling thread by waiting on the future's completion. The coordinator should not be blocking.
- The background thread loop will poll the coordinator.
- We will modify the existing rebalance states, specifically for the callback execution.
Refactor the KafkaConsumer API
- It will send Events to the background thread if network communication is required.
- Remove dependency on the fetcher, coordinator, and networkClient.
- Minor Refactor:
- Fetcher:
- sendFetches will be performed in the background thread.
- network client, coordinator will be removed from the class
- Fetcher:
Events and communication channels.
We will use two BlockingQueues for the two-way communication between the background thread and the polling thread.Address issues in these Jira tickets
Terms
Polling Thread
The client thread, also known as the polling thread, is where the thread user invokes the poll() operation.
Background Thread
Specifically, in the new design context, we use this generic term to indicate the active process running in the background. It handles network communication, rebalancing, and heartbeat.
Heartbeat Thread
The background running thread in the current implementation (see KIP-62).
Channel
We have two channels in this design. One channel delivers messages from the polling thread to the background thread. The other channel delivers messages submitted by the background thread to the polling thread.
Event
The primary communication medium for the polling thread is to interact with the background thread.
Proposed Design
In the schematic below, the main components are:
Design
The new consumer is asynchronous and event drive. Former means we are separating the responsibility between the polling thread and the background thread. The latter means the two threads communicate via events. The core components are:
Application thread: user thread on which all
Consumer
API requests and callback handlers are executed.Network thread: client-internal thread on which network I/O related to heartbeat, coordinator, and rebalance is executed.
Event handler: A communication interface between the application thread and network thread
Application event queue: sends events to the network I/O thread from the application thread
Background event queue: sends events from the network thread to the application thread
- Application event processor:
Polling thread: handles all of the API requests and callback executions.
Background thread: handles network communication such as heartbeat, coordinator requests, and rebalance flow execution.
Communication channels:
ApplicationEventQueue: sending events to the background thread
BackgroundEventQueue: sending events to the polling thread
Both queues will be interfaced by the EventHandler interface
We will discuss the strategy for handling event results in the following section. In short, we will use the CompletableFuture API to handle the asynchronous operation between the two threads.
A note on the subscriptionState: Its reference will be shared by polling and background threads, i.e., we will refactor the current implementation to be thread-safe. The reason we are making this data structure an exception are:
- The object is frequently accessed by both the polling thread and background thread.
- The object often requires instaneous response; otherwise, the performance will be heavily impacted
- Also, please review the rejected proposals section below. I talked about a few ideas that we've had.
...
Important Components
Background thread and its lifecycle
We use a state machine to represent the different states of the coordinator connection; these are: down, initialized, coordinator_discovery (coordinator discovery), and stable.
- down: The coordinator is uninitialized
- initialized: The background thread completes initialization, and the loop has started.
- coordinator_discovery: The coordinator is requested but hasn't been connected yet. If there's no in-flight FindCoordinatorRequest, then it will send one. If one exists, check for the request results.
- stable: The coordinator is connected.
The coordinator discovery process piggybacks on an event loop running on the background thread. The event loop polls events from the queue, and handles coordinator connection on demand, i.e., if we don't need a coordinator, the background thread will stay in the initialized state. If an event requires a coordinator, we will then move to the coordinator_discovery state and wait for the response to come back.
...
Background thread and its lifecycle
, for async APIs like commitAsync(), CompletableFuture will be used to notify the completion of the events.
Application and Network Thread
The following diagram depicts the interaction between the application thread and the network thread:
draw.io Diagram | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
The Consumer
client object is here depicted in purple. In this design, instead of directly operating on the parameters given to the various APIs (subscribe()
, poll()
, commit()
, etc.), the Consumer
implementation packages the parameters as events that are enqueued on the application event queue.
The event handler, shown in green, executes on the network thread. In each loop of the network thread, events are read from the queue and processed.
If the Consumer
API is a blocking call, the event passed from the application thread to the network thread will include an embedded CompleteableFuture
. After enqueuing the event, the application thread will invoke Future.get()
, effectively blocking itself until a result is provided. When the result for the Consumer
API call is ready, the network thread will then invoke CompeteableFuture.complete()
with the result, allowing the application thread to continue execution.
Submitting Client Requests
The following diagram displays the basic flow between the request managers, unsent request queue, and the NetworkClient
:
draw.io Diagram | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
A request manager represents the logic and stated needed to issue a Kafka RPC requests and handle its response. A request manager may contain logic to handle more than one type of RPC.
In the network thread, we loop over each request manager, effectively asking it for any requests that it needs to send to the cluster. Note that during this step the network request is not sent. Instead, an unsent request object is created which contains the underlying request information. These "unsent requests" are added to a queue of pending unsent client requests. After all of these unsent requests are queued up, then they are forwarded for network I/O via the NetworkClient.send()
method.
There are two benefits for this multi-step process:
- It keeps the network I/O request and response management and lifecycle in one place, making the code easier to reason about
- The request managers can implement deduplication and/or coalescing of requests
Terminologies:
- CB: Callbacks registered and invoked on the polling thread: commit callback, rebalance callback.
- rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.
- subscriptionState design is still under discussion
Application thread and its lifecycle
The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :
- The user invokes assign(), the subscriptionState is altered.
- The subscription state changes are sent to the background thread via the BackgroundEventQueue.
- The user then invokes poll() in a loop.
- During the poll, the polling thread sends a fetch request to the background thread.
- During the poll, the polling thread polls fetch results from the BackgroundEventQueue. It deserializes the poll results and returns the result to the user.
- The user processes the results and invokes commitSync().
- The client thread sends an OffsetCommitApplicationEvent to the background thread. As this is a blocking operation, the method returns when the background thread completes the commit.
Background thread and its lifecycle
The background runs a loop that periodically checks the ApplicationEventQueue, and drains and processes the events. On the high level, the lifecycle of the background thread can be summarized as such:
- The application starts up the Consumer, the Consumer creates an EventHandler, and starts up the background thread.
- The background thread enters the loop and starts polling the ApplicationEventQueue.
- Events will be sent to the corresponding RequestManager. For example, a commit event is sent to the OffsetCommitRequestManager.
- The background thread polls each RequestManager. If the RequestManager returns a result, we enqueue it to the NetworkClientDelegate.
- Poll the NetworkClientDelegate to ensure the requests are sent.
Network Layers
We are deprecating the current ConsumerNetworkClient because:
- The lockings are unnecessary in the new design because everything is on a single thread.
- Some irrelevant features are irrelevant to this design, such as unsent.
We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the requests.
- All requests are first enqueued into the unsentRequests queue
- Polling the NetworkClient will result in sending the requests to the queue.
Request Manager
Kafka consumer tasks are tight to the broker requests and responses. In the new implementation, we took a more modular approach to create request managers for different tasks and have the background thread to poll these request managers to see if any requests need to be send. Once a request is returned by the poll, the background thread will enqueu it to the network client to be sent out.
The request managers handle the following requests
- FindCoordinatorRequest
- OffsetCommitRequest
- FetchRequest
- MetadataRequest
- HeartbeatRequest
- ListOffsetRequest
After KIP-848 is implemented, the request managers also handle the following:
- ConsumerGroupHeartbeatRequest
- ConsumerGroupPrepareAssignmentRequest
- ConsumerGroupInstallAssignmentRequest
RequestFuture and Callback
The current implementation chains callbacks to requestFutures (Kafka internal type). We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.
Events and EventHandler
EventHandler is the main interface between the polling thread and the background thread. It has two main purposes:
- Allows polling thread to send events to the background thread
- Allows polling thread to poll background thread events
Here we define two types of events:
- ApplicationEvent: application side events that will be sent to the background thread
- BackgroundEvent: background thread events that will be sent to the application
The background thread has not been constructed, so it is in the down state.
The polling thread starts the background; the background thread finishes initialization and then moves to the initialized state. Usually, this happens after new KafkaConsumer().
The background thread loop performs the following tasks:
Check if there is an in-flight event. If not, poll for the new events from the channel.
Run the state machine, and here are the following scenario:
- The event does not require a coordinator. Execute the event and move on.
- The event requires a coordinator, but it is currently disconnected. Move the state to coordinator_discovery.
- The background thread is currently in coorinator_discovery state; continue to loop and wait for the FindCoordinator response.
- The background thread is in the stable state.
- Poll the Coordinator.
- Execute the event.
Poll the networkClient.
- Backoff for retryBackoffMs milliseconds.
Rebalance States
We modify the existing rebalance states to aid the callback execution.
We split the PREPARING_REBALANCE into PREPARE_REVOCATION, and PARTITION_REVOKED, PREPARING_REBALANCE
- PREPARE_REVOCATION: Pause partitions that will be revoked
REVOKING_PARTITION: Await for the onPartitionRevoked to be compelted.
- PARTITION_REVOKED: Update the subscription. Autocommit.
And ASSIGN_PARTITION state will be added before STABLE
ASSIGNING_PARTITIONS: Await for onPartitionAssign to be completed.
...
After the proposed state transition is as such:
Starting from UNJOINED: UNJOINED → PREPARING_REBALANCE → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE
(EAGER) Starting from STABLE: STABLE → PREPARE_REVOCATION → REVOKING_PARTITION → PARTITION_REVOKED → PREPARING_REBALANCE → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE
(COOPERATIVE) STABLE → PREPARE_REVOCATION → REVOKING_PARTITION → PARTITION_REVOKED → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE → //rejoin
...
We use a blocking queue to send API events from the polling thread to the background thread. The queues We will be abstract into an eventHandler, so we don't need to operate directly with the queuethe communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.
EventHandler
interface EventHandler {
public ResponseEventApplicationEvent poll();
public void add(RequestEvent event);
}
...
ApplicationEventQueue and ApplicationEvent
// Channel used to send events to the background thread
private BlockingQueue<RequestEvent>BlockingQueue<ApplicationEvent> queue;
abstract public class RequestEventApplicationEvent {
private final RequestEventTypeApplicationEventType eventType;
}
enum RequestEventTypeApplicationEventType {
COMMIT,
METADATA_UPDATE,
...ACK_PARTITION_REVOKED,
ACK_PARTITION_ASSIGNED,
UPDATE_METADATA,
LEAVE_GROUP,
}
...
BackgroundEventQueue and BackgroundEvent
// Channel used to send events to the polling thread for client side execution/notification
private BlockingQueue<ResponseEvent>BlockingQueue<BackgroundEvent> queue;
abstract public class ResponseEventBackgroundEvent {
private final ResponseEventTypeBackgroundEventType eventType;
}
enum ResponseEventTypeBackgroundEventType {
ERROR,
REVOKE_PARTITIONS,
ASSIGN_PARTITIONS,
FETCH_RESPONSE,
}
Major Changes
Consumer Poll Changes
consumer.poll() will mainly be a loop polling for events from the event handler. Here are the important events:
- Fetch Data: Return immediately
- Callback Invocation: Invoke the callback and subsequently send an acknowledgment event to the background thread to advance the state machine (see background thread section).
- Error: Handle the error upon receiving it.
Consumer API Internal Changes
Poll
The users are required to invoke poll to:
- Trigger auto-commit
- Poll exceptions: process or raise it to the user
- Poll fetches
- Poll callback invocation trigger to trigger the rebalance listeners.
CommitSync
- The polling thread send a commit event. The commit event has a completable future.
- Wait for the completable future to finish, so that we can make this a blocking API
Assign
- If we are assigning nothing, trigger unsubscribe()
- clear the fetcher buffer
- send a commit event if autocommit is enabled
- send metadata update
Subscribe
- If subscribing to nothing, trigger unsubscribe()
- clear the fetcher buffer
- subscribes
- send metadata update
Unsubscribe
- Send a leave group event
- unsubscribe from the topics
Major Changes
Fetcher
We will break the current fetcher into three parts to accommodate the asynchronous design, i.e., we need the background thread to send fetches autonomously and the polling thread to collect fetches when these fetches become available. We will have 3 separate classes here:
- FetchSender: Responsible for sending fetches in the background thread
- FetchHandler: Sitting in the polling thread's poll loop, processing the fetch response from the fetch event.
- FetchBuffer: This is the CompletedFetches in the old implementation. The use case prevents the FetchSender from sending too many fetches and causing memory issues. This will be removed once we implement the memory-based buffer.(KIP-81)
Consumer Poll Changes
We will remove the metadata logic from the consumer.poll(), so that the execution of the poll loop is much simplified. It mainly handles:
- fetches
- callback invocation
- errors
ConsumerCoordinator and AbstractCoordinator
- New states will be introduced (see Rebalance States section above). The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.
- It will still require a fetch event to poll heartbeat. As only polling thread issues fetch events, and we want to respect the existing implementation.
- Timeouts and timers will be reevaluated and possibly removed.
- while loops will be reevaluated and possibly thrown out. In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.
Wakeup() and WakeupException
...
- .
...
In the new design, wakeup will interrupt the blocking polling loop in the polling thread.
WakeupException
The exception will be thrown by the polling thread when the loop is interrupted.
Timeout Policy
Consumer.poll() - user provide timeout
Coordinator rediscovery backoff: retry.backoff.ms
Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should use request.timeout.ms. And re-attempt in the next loop if failed
CommitOffsetSync: user provided
Is there a better way to configure session interval and heartbeat interval?
Consumer API Internal Changes
Assign
- The polling thread updates the subscriptionState.
- Create an ASSIGN event, and queue up the event to the background thread.
- Background thread receives the event and executes the event.
- send a commitOffset
- Update metadata
Unsubscribe
- The polling thread removes the subscriptions from the subscriptionState
- executes the onPartitionRevoke callback
- Send background thread an LEAVE_GROUP event
- Returns
- The background thread:
- Initiate LeaveGroup
Subscribe
- The polling thread updates the subscriptionState.
- Create a SUBSCRIBE event, and send it to the background thread.
- Background thread executes the event:
- Update metadata
Poll
- Poll the EventHandler, and execute the events accordingly
- Execute the interceptor.onConsumer and return the data.
- Background thread execute the FETCH event:
- Autocommits // I think we can move the autocommit to the background thread
- issue fetch requests
Event Data Models
...
Please see Java client Consumer timeouts for more detail on timeouts.
Test Plan
We will first write and use the existing unit tests and integration tests.
We need to ensure the timing of the critical events are happening in the correct sequence. For example: We need to first discover the coordinator, second, commit the offset while pausing the partition for being fetched, revoke the partition, and then continue onto rest of the rebalance process.
We will also ensure the heartbeat interval, and poll interval are respected.
We also need to ensure there's no event lost, and the event should happen in the correct sequence.
Please review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.
Compatibility
The new consumer should be backward compatible.
Alternative Proposals
Fetcher
Should some of the fetcher configuration be dynamic
- Configurable prefetch buffer
SubscriptionState
Non-shared: We can make a copy of the subscriptionState in the background thread, and use event to drive the synchronization.
- There could be out of sync issues, which can subsequently causes in correct fetching, etc..
API Changes
Poll returns CompletableFuture<ConsumerRecord<K,V>>
User Adoption
The refactor should have (almost) no regression or breaking changes upon merge. So user should be able to continue using the new client.
Release Plan
...
- Background thread
- A new coordinator implementation, AsyncConsumerCoordinator, for example.
- Events and event executors
...