This document highlights our effort to refactor the current KafkaConsumer. This project aims to address issues and shortcomings we've encountered in the past years, such as increasing code complexity and readability, concurrency bugs, rebalancing issues, and, lastly, the enable KIP-848.
Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code. The complex code path and intertwined logic make the code difficult to modify and comprehend. For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers.
Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has caused race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.
Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously with the poll. Firstly, it simplifies the design and logic because the timing of the network request is more predictable. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalancing process.
Refactor the KafkaConsumer API innards
Address issues in these Jira tickets
The new consumer is asynchronous and event drive. Former means we are separating the responsibility between the polling thread and the background thread. The latter means the two threads communicate via events. The core components are:
Application thread: user thread on which all Consumer
API requests and callback handlers are executed.
Network thread: client-internal thread on which network I/O related to heartbeat, coordinator, and rebalance is executed.
Event handler: A communication interface between the application thread and network thread
Application event queue: sends events to the network I/O thread from the application thread
Background event queue: sends events from the network thread to the application thread
We will discuss the strategy for handling event results in the following section. In short, for async APIs like commitAsync(), CompletableFuture will be used to notify the completion of the events.
The following diagram depicts the interaction between the application thread and the network thread:
The Consumer
client object is here depicted in purple. In this design, instead of directly operating on the parameters given to the various APIs (subscribe()
, poll()
, commit()
, etc.), the Consumer
implementation packages the parameters as events that are enqueued on the application event queue.
The event handler, shown in green, executes on the network thread. In each loop of the network thread, events are read from the queue and processed.
If the Consumer
API is a blocking call, the event passed from the application thread to the network thread will include an embedded CompleteableFuture
. After enqueuing the event, the application thread will invoke Future.get()
, effectively blocking itself until a result is provided. When the result for the Consumer
API call is ready, the network thread will then invoke CompeteableFuture.complete()
with the result, allowing the application thread to continue execution.
The following diagram displays the basic flow between the request managers, unsent request queue, and the NetworkClient
:
A request manager represents the logic and stated needed to issue a Kafka RPC requests and handle its response. A request manager may contain logic to handle more than one type of RPC.
In the network thread, we loop over each request manager, effectively asking it for any requests that it needs to send to the cluster. Note that during this step the network request is not sent. Instead, an unsent request object is created which contains the underlying request information. These "unsent requests" are added to a queue of pending unsent client requests. After all of these unsent requests are queued up, then they are forwarded for network I/O via the NetworkClient.send()
method.
There are two benefits for this multi-step process:
Terminologies:
The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :
The background runs a loop that periodically checks the ApplicationEventQueue, and drains and processes the events. On the high level, the lifecycle of the background thread can be summarized as such:
We are deprecating the current ConsumerNetworkClient because:
We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the requests.
Kafka consumer tasks are tight to the broker requests and responses. In the new implementation, we took a more modular approach to create request managers for different tasks and have the background thread to poll these request managers to see if any requests need to be send. Once a request is returned by the poll, the background thread will enqueu it to the network client to be sent out.
The request managers handle the following requests
After KIP-848 is implemented, the request managers also handle the following:
The current implementation chains callbacks to requestFutures (Kafka internal type). We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.
EventHandler is the main interface between the polling thread and the background thread. It has two main purposes:
Here we define two types of events:
We use a blocking queue to send API events from the polling thread to the background thread. We will abstract the communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.
interface EventHandler {
public ApplicationEvent poll();
public void add(RequestEvent event);
}
// Channel used to send events to the background thread
private BlockingQueue<ApplicationEvent> queue;
abstract public class ApplicationEvent {
private final ApplicationEventType eventType;
}
enum ApplicationEventType {
COMMIT,
ACK_PARTITION_REVOKED,
ACK_PARTITION_ASSIGNED,
UPDATE_METADATA,
LEAVE_GROUP,
}
// Channel used to send events to the polling thread for client side execution/notification
private BlockingQueue<BackgroundEvent> queue;
abstract public class BackgroundEvent {
private final BackgroundEventType eventType;
}
enum BackgroundEventType {
ERROR,
REVOKE_PARTITIONS,
ASSIGN_PARTITIONS,
FETCH_RESPONSE,
}
The users are required to invoke poll to:
We will break the current fetcher into three parts to accommodate the asynchronous design, i.e., we need the background thread to send fetches autonomously and the polling thread to collect fetches when these fetches become available. We will have 3 separate classes here:
We will remove the metadata logic from the consumer.poll(), so that the execution of the poll loop is much simplified. It mainly handles:
Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.
Please see Java client Consumer timeouts for more detail on timeouts.
Compatibility
The new consumer should be backward compatible.
Should some of the fetcher configuration be dynamic
Non-shared: We can make a copy of the subscriptionState in the background thread, and use event to drive the synchronization.
Poll returns CompletableFuture<ConsumerRecord<K,V>>
The refactor should have (almost) no regression or breaking changes upon merge. So user should be able to continue using the new client.