Table of Contents |
---|
Public-Facing Changes
I'm expecting the public interface to stay the same, as well as the API contracts.
Java Doc
- New Exception thrown
- Behavioral Change
Summary
This document highlights our effort to refactor the current KafkaConsumer. This project aims to address issues and shortcomings we've encountered in the past years, such as increasing code complexity and readability, concurrency bugs, rebalancing issues, and, lastly, the enable KIP-848.
...
- Define the responsibilities of the polling thread and the background thread
- Define the communication interface and protocol between the two threads
- Define a Network layer over the NetworkClient, including how different requests are sent and handled
- Define RequestManagers
- Define RequestStates to handle retry backoff
- Redesign SubscriptionState
- Deprecate HeartbeatThread and move it to the background thread
- Define rebalance flow, including how JoinGroup and SyncGroup request is sent and how callbacks are triggered
Refactor the KafkaConsumer API innards
Address issues in these Jira tickets
Terms
Polling Thread
The client thread, or the polling thread, is where the thread user invokes the poll() operation.
Background Thread
...
Heartbeat Thread
The background running thread in the current implementation (see KIP-62).
Channel
The communication interface between two threads. One channel delivers messages from the polling thread to the background thread. The other channel delivers messages submitted by the background thread to the polling thread.
Event
The primary communication medium for the polling thread is to interact with the background thread.
...
Design
The new consumer is asynchronous and event drive. Former means we are separating the responsibility between the polling thread and the background thread. The latter means the two threads communicate via events. The core components are:
Polling Application thread: handles all of the user thread on which all
Consumer
API requests and callback executionshandlers are executed.Background Network thread: handles network communication such as client-internal thread on which network I/O related to heartbeat, coordinator requests, and rebalance flow executionis executed.
Event Handlerhandler: A communication interface between the polling application thread and background network thread.
- ApplicationEventQueue
Application event queue:
sendingsends events to
the backgroundthe network I/O thread from the application thread
- BackgroundEventQueue
Background event queue:
sendingsends events from the network thread to the
pollingapplication thread
- Both queues will be interfaced by the EventHandler interfaceApplication event processor:
We will discuss the strategy for handling event results in the following section. In short, for async APIs like commitAsync(), CompletableFuture will be used to notify the completion of the events.
Application and Network Thread
The following diagram depicts the interaction between the application thread and the network thread:
draw.io Diagram | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
The Consumer
client object is here depicted in purple. In this design, instead of directly operating on the parameters given to the various APIs (subscribe()
, poll()
, commit()
, etc.), the Consumer
implementation packages the parameters as events that are enqueued on the application event queue.
The event handler, shown in green, executes on the network thread. In each loop of the network thread, events are read from the queue and processed.
If the Consumer
API is a blocking call, the event passed from the application thread to the network thread will include an embedded CompleteableFuture
. After enqueuing the event, the application thread will invoke Future.get()
, effectively blocking itself until a result is provided. When the result for the Consumer
API call is ready, the network thread will then invoke CompeteableFuture.complete()
with the result, allowing the application thread to continue execution.
Submitting Client Requests
The following diagram displays the basic flow between the request managers, unsent request queue, and the NetworkClient
:
draw.io Diagram | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
A request manager represents the logic and stated needed to issue a Kafka RPC requests and handle its response. A request manager may contain logic to handle more than one type of RPC.
In the network thread, we loop over each request manager, effectively asking it for any requests that it needs to send to the cluster. Note that during this step the network request is not sent. Instead, an unsent request object is created which contains the underlying request information. These "unsent requests" are added to a queue of pending unsent client requests. After all of these unsent requests are queued up, then they are forwarded for network I/O via the NetworkClient.send()
method.
There are two benefits for this multi-step process:
- It keeps the network I/O request and response management and lifecycle in one place, making the code easier to reason about
- The request managers can implement deduplication and/or coalescing of requests
Terminologies:
- CB: Callbacks registered and invoked on the polling thread: commit callback, rebalance callback.
- rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.
- subscriptionState design is still under discussion
Design
...
Application thread and its lifecycle
The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :
...
// Channel used to send events to the polling thread for client side execution/notification
private BlockingQueue<BackgroundEvent> queue;
abstract public class BackgroundEvent {
private final BackgroundEventType eventType;
}
enum BackgroundEventType {
ERROR,
REVOKE_PARTITIONS,
ASSIGN_PARTITIONS,
FETCH_RESPONSE,
}
Rebalance [WIP]
One of the main reasons we are refactoring the KafkaConsumer is to satisfy the requirements of the new rebalance protocol introduced in KIP-848.
KIP-848 contains two assignment modes, server-side mode and client-side mode. Both use the new Heartbeat API, the ConsumerGroupHeartbeat.
The server-side mode is simpler: the assignments are computed by the Group Coordinator, and the clients are only responsible for revoking and assigning the partitions.
If the user chooses to use the client-side assignor, the assignment will be computed by one of the member, and the assignment and revocation is done via the heartbeat as server side mode.
In the new design we will build the following components:
- GroupState: keep track of the current state of the group, such as Generation, and the rebalance state.
- HeartbeatRequestManager: A type of request manager that is responsible for calling the ConsumerGroupHeartbeat API
- Assignment Manager: Manages partition assignments.
Rebalance Flow
New Consumer Group
- The user invokes subscribe(). SubscriptionState is altered. A subscription state alters event is sent to the background thread.
- The background thread processes the event and updates the GroupState to PREPARE.
- HeartbeatRequestManager is polled. It checks the GroupState and determines it is time to send the heartbeat.
- ConsumerGroupHeartbeatResponse received. Updated the GroupState to ASSIGN.
- PartitionAssignmentManager is polled, and realize the GroupState is in ASSIGN. Trigger assignment computation:
- [We might need another state here]
- Once the assignment is computed, send an event to the client thread to invoke the rebalance callback.
- Callback triggered; notify the background thread.
- PartitionAssignmentManager is polled Transition to Complete.
- [something needs to happen here]
- Transition the GroupState to Stable.
GroupState
[UNJOINED, PREPARE, ASSIGN, COMPLETE, STABLE]
...
...
Consumer API Internal Changes
...
- New states will be introduced (see Rebalance States section above). The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.
- It will still require a fetch event to poll heartbeat. As only polling thread issues fetch events, and we want to respect the existing implementation.
- Timeouts and timers will be reevaluated and possibly removed.
- while loops will be reevaluated and possibly thrown out. In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.
Timeout Policy
Please see Java client Consumer .poll() - user provide timeout
Coordinator rediscovery backoff: retry.backoff.ms
Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should use request.timeout.ms. And re-attempt in the next loop if failed
CommitOffsetSync: user provided
Rebalance State Timeout: maybe using the request timeout
Is there a better way to configure session interval and heartbeat interval?timeouts for more detail on timeouts.
Compatibility
The new consumer should be backward compatible.
...