...
The following diagram depicts the interaction between the application thread and the network thread:
inc-
draw.io Diagram | |
---|---|
|
|
|
|
|
The Consumer
client object is here depicted in purple. In this design, instead of directly operating on the parameters given to the various APIs (subscribe()
, poll()
, commit()
, etc.), the Consumer
implementation packages the parameters as events that are enqueued on the application event queue.
...
If the Consumer
API is a blocking call, the event passed from the application thread to the network thread will include an embedded CompleteableFuture
. After enqueuing the event, the application thread will invoke Future.get()
, effectively blocking itself until a result is provided. When the result for the Consumer
API call is ready, the network thread will then invoke CompeteableFuture.complete()
with the result, allowing the application thread to continue execution.
Components of the Network Thread
Terminologies:
- CB: Callbacks registered and invoked on the polling thread: commit callback, rebalance callback.
- rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.
- subscriptionState design is still under discussion
Application thread and its lifecycle
The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :
- The user invokes assign(), the subscriptionState is altered.
- The subscription state changes are sent to the background thread via the BackgroundEventQueue.
- The user then invokes poll() in a loop.
- During the poll, the polling thread sends a fetch request to the background thread.
- During the poll, the polling thread polls fetch results from the BackgroundEventQueue. It deserializes the poll results and returns the result to the user.
- The user processes the results and invokes commitSync().
- The client thread sends an OffsetCommitApplicationEvent to the background thread. As this is a blocking operation, the method returns when the background thread completes the commit.
Background thread and its lifecycle
The background runs a loop that periodically checks the ApplicationEventQueue, and drains and processes the events. On the high level, the lifecycle of the background thread can be summarized as such:
- The application starts up the Consumer, the Consumer creates an EventHandler, and starts up the background thread.
- The background thread enters the loop and starts polling the ApplicationEventQueue.
- Events will be sent to the corresponding RequestManager. For example, a commit event is sent to the OffsetCommitRequestManager.
- The background thread polls each RequestManager. If the RequestManager returns a result, we enqueue it to the NetworkClientDelegate.
- Poll the NetworkClientDelegate to ensure the requests are sent.
Network Layers
...
We are deprecating the current ConsumerNetworkClient because:
- The lockings are unnecessary in the new design because everything is on a single thread.
- Some irrelevant features are irrelevant to this design, such as unsent.
We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the requests.
- All requests are first enqueued into the unsentRequests queue
- Polling the NetworkClient will result in sending the requests to the queue.
Request Manager
Kafka consumer tasks are tight to the broker requests and responses. In the new implementation, we took a more modular approach to create request managers for different tasks and have the background thread to poll these request managers to see if any requests need to be send. Once a request is returned by the poll, the background thread will enqueu it to the network client to be sent out.
The request managers handle the following requests
- FindCoordinatorRequest
- OffsetCommitRequest
- FetchRequest
- MetadataRequest
- HeartbeatRequest
- ListOffsetRequest
After KIP-848 is implemented, the request managers also handle the following:
- ConsumerGroupHeartbeatRequest
- ConsumerGroupPrepareAssignmentRequest
- ConsumerGroupInstallAssignmentRequest
RequestFuture and Callback
The current implementation chains callbacks to requestFutures (Kafka internal type). We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.
Events and EventHandler
EventHandler is the main interface between the polling thread and the background thread. It has two main purposes:
- Allows polling thread to send events to the background thread
- Allows polling thread to poll background thread events
Here we define two types of events:
- ApplicationEvent: application side events that will be sent to the background thread
- BackgroundEvent: background thread events that will be sent to the application
We use a blocking queue to send API events from the polling thread to the background thread. We will abstract the communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.
EventHandler
interface EventHandler {
public ApplicationEvent poll();
public void add(RequestEvent event);
}
ApplicationEventQueue and ApplicationEvent
// Channel used to send events to the background thread
private BlockingQueue<ApplicationEvent> queue;
abstract public class ApplicationEvent {
private final ApplicationEventType eventType;
}
enum ApplicationEventType {
COMMIT,
ACK_PARTITION_REVOKED,
ACK_PARTITION_ASSIGNED,
UPDATE_METADATA,
LEAVE_GROUP,
}
BackgroundEventQueue and BackgroundEvent
// Channel used to send events to the polling thread for client side execution/notification
private BlockingQueue<BackgroundEvent> queue;
abstract public class BackgroundEvent {
private final BackgroundEventType eventType;
}
enum BackgroundEventType {
ERROR,
REVOKE_PARTITIONS,
ASSIGN_PARTITIONS,
FETCH_RESPONSE,
}
Rebalance [WIP]
One of the main reasons we are refactoring the KafkaConsumer is to satisfy the requirements of the new rebalance protocol introduced in KIP-848.
KIP-848 contains two assignment modes, server-side mode and client-side mode. Both use the new Heartbeat API, the ConsumerGroupHeartbeat.
The server-side mode is simpler: the assignments are computed by the Group Coordinator, and the clients are only responsible for revoking and assigning the partitions.
If the user chooses to use the client-side assignor, the assignment will be computed by one of the member, and the assignment and revocation is done via the heartbeat as server side mode.
In the new design we will build the following components:
- GroupState: keep track of the current state of the group, such as Generation, and the rebalance state.
- HeartbeatRequestManager: A type of request manager that is responsible for calling the ConsumerGroupHeartbeat API
- Assignment Manager: Manages partition assignments.
Rebalance Flow
New Consumer Group
- The user invokes subscribe(). SubscriptionState is altered. A subscription state alters event is sent to the background thread.
- The background thread processes the event and updates the GroupState to PREPARE.
- HeartbeatRequestManager is polled. It checks the GroupState and determines it is time to send the heartbeat.
- ConsumerGroupHeartbeatResponse received. Updated the GroupState to ASSIGN.
- PartitionAssignmentManager is polled, and realize the GroupState is in ASSIGN. Trigger assignment computation:
- [We might need another state here]
- Once the assignment is computed, send an event to the client thread to invoke the rebalance callback.
- Callback triggered; notify the background thread.
- PartitionAssignmentManager is polled Transition to Complete.
- [something needs to happen here]
- Transition the GroupState to Stable.
GroupState
[UNJOINED, PREPARE, ASSIGN, COMPLETE, STABLE]
- UNJOINED: There's no rebalance. For the simple consumed use case, the GroupState remains in UNJOINED
- PREPARE: Sending the heartbeat and await the response
- ASSIGN: Assignment updated, client thread side callbacks are triggered, and await completion
- COMPLETE: Client thread callback completed and has notified the background thread.
- STABLE: stable group
Consumer group member state machine
It becomes clear when reading KIP-848 that the work of keeping the consumer group in proper state is fairly involved. We therefore turn our focus now to the logic needed for the consumer group member state machine (hereafter, CGMSM).
Based on the user calling either assign()
or subscribe()
, a Consumer
determines how topic partitions are to be assigned. If the user calls the subscribe()
API, the Consumer
knows that it is being directed to use Kafka's consumer group-based partition assignment. The use of assign()
signifies the user's intention to manage the partition assignments from within the application via manual partition assignment. It is only in the former case that a CGMSM needs to be created.
Note that the necessary logic to establish a connection to the Kafka broker node acting as the group coordinator is outside the scope of the CGMSM logic.
In order to keep the size of a ConsumerGroupHeartbeatRequest
smaller, KIP-848's description of the request schema states that some values are conditionally sent with the request only when they change on the client. These values include:
InstanceId
RackId
RebalanceTimeoutMs
SubscribedTopicNames
SubscribedTopicRegex
ServerAssignor
ClientAssignors
TopicPartitions
The following diagram provides a visual overview of the states and transitions for members of the consumer group:
...
Submitting Client Requests
The following diagram displays the basic flow between the request managers, unsent request queue, and the NetworkClient
:
draw.io Diagram | ||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
A request manager represents the logic and stated needed to issue a Kafka RPC requests and handle its response. A request manager may contain logic to handle more than one type of RPC.
In the network thread, we loop over each request manager, effectively asking it for any requests that it needs to send to the cluster. Note that during this step the network request is not sent. Instead, an unsent request object is created which contains the underlying request information. These "unsent requests" are added to a queue of pending unsent client requests. After all of these unsent requests are queued up, then they are forwarded for network I/O via the NetworkClient.send()
method.
There are two benefits for this multi-step process:
- It keeps the network I/O request and response management and lifecycle in one place, making the code easier to reason about
- The request managers can implement deduplication and/or coalescing of requests
Terminologies:
- CB: Callbacks registered and invoked on the polling thread: commit callback, rebalance callback.
- rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.
- subscriptionState design is still under discussion
Application thread and its lifecycle
The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :
- The user invokes assign(), the subscriptionState is altered.
- The subscription state changes are sent to the background thread via the BackgroundEventQueue.
- The user then invokes poll() in a loop.
- During the poll, the polling thread sends a fetch request to the background thread.
- During the poll, the polling thread polls fetch results from the BackgroundEventQueue. It deserializes the poll results and returns the result to the user.
- The user processes the results and invokes commitSync().
- The client thread sends an OffsetCommitApplicationEvent to the background thread. As this is a blocking operation, the method returns when the background thread completes the commit.
Background thread and its lifecycle
The background runs a loop that periodically checks the ApplicationEventQueue, and drains and processes the events. On the high level, the lifecycle of the background thread can be summarized as such:
- The application starts up the Consumer, the Consumer creates an EventHandler, and starts up the background thread.
- The background thread enters the loop and starts polling the ApplicationEventQueue.
- Events will be sent to the corresponding RequestManager. For example, a commit event is sent to the OffsetCommitRequestManager.
- The background thread polls each RequestManager. If the RequestManager returns a result, we enqueue it to the NetworkClientDelegate.
- Poll the NetworkClientDelegate to ensure the requests are sent.
Network Layers
We are deprecating the current ConsumerNetworkClient because:
- The lockings are unnecessary in the new design because everything is on a single thread.
- Some irrelevant features are irrelevant to this design, such as unsent.
We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the requests.
- All requests are first enqueued into the unsentRequests queue
- Polling the NetworkClient will result in sending the requests to the queue.
Request Manager
Kafka consumer tasks are tight to the broker requests and responses. In the new implementation, we took a more modular approach to create request managers for different tasks and have the background thread to poll these request managers to see if any requests need to be send. Once a request is returned by the poll, the background thread will enqueu it to the network client to be sent out.
The request managers handle the following requests
- FindCoordinatorRequest
- OffsetCommitRequest
- FetchRequest
- MetadataRequest
- HeartbeatRequest
- ListOffsetRequest
After KIP-848 is implemented, the request managers also handle the following:
- ConsumerGroupHeartbeatRequest
- ConsumerGroupPrepareAssignmentRequest
- ConsumerGroupInstallAssignmentRequest
RequestFuture and Callback
The current implementation chains callbacks to requestFutures (Kafka internal type). We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.
Events and EventHandler
EventHandler is the main interface between the polling thread and the background thread. It has two main purposes:
- Allows polling thread to send events to the background thread
- Allows polling thread to poll background thread events
Here we define two types of events:
- ApplicationEvent: application side events that will be sent to the background thread
- BackgroundEvent: background thread events that will be sent to the application
We use a blocking queue to send API events from the polling thread to the background thread. We will abstract the communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.
EventHandler
interface EventHandler {
public ApplicationEvent poll();
public void add(RequestEvent event);
}
ApplicationEventQueue and ApplicationEvent
// Channel used to send events to the background thread
private BlockingQueue<ApplicationEvent> queue;
abstract public class ApplicationEvent {
private final ApplicationEventType eventType;
}
enum ApplicationEventType {
COMMIT,
ACK_PARTITION_REVOKED,
ACK_PARTITION_ASSIGNED,
UPDATE_METADATA,
LEAVE_GROUP,
}
BackgroundEventQueue and BackgroundEvent
// Channel used to send events to the polling thread for client side execution/notification
private BlockingQueue<BackgroundEvent> queue;
abstract public class BackgroundEvent {
private final BackgroundEventType eventType;
}
enum BackgroundEventType {
ERROR,
REVOKE_PARTITIONS,
ASSIGN_PARTITIONS,
FETCH_RESPONSE,
}
Consumer API Internal Changes
Poll
The users are required to invoke poll to:
- Trigger auto-commit
- Poll exceptions: process or raise it to the user
- Poll fetches
- Poll callback invocation trigger to trigger the rebalance listeners.
CommitSync
- The polling thread send a commit event. The commit event has a completable future.
- Wait for the completable future to finish, so that we can make this a blocking API
Assign
- If we are assigning nothing, trigger unsubscribe()
- clear the fetcher buffer
- send a commit event if autocommit is enabled
- send metadata update
Subscribe
- If subscribing to nothing, trigger unsubscribe()
- clear the fetcher buffer
- subscribes
- send metadata update
Unsubscribe
- Send a leave group event
- unsubscribe from the topics
Major Changes
Fetcher
We will break the current fetcher into three parts to accommodate the asynchronous design, i.e., we need the background thread to send fetches autonomously and the polling thread to collect fetches when these fetches become available. We will have 3 separate classes here:
- FetchSender: Responsible for sending fetches in the background thread
- FetchHandler: Sitting in the polling thread's poll loop, processing the fetch response from the fetch event.
- FetchBuffer: This is the CompletedFetches in the old implementation. The use case prevents the FetchSender from sending too many fetches and causing memory issues. This will be removed once we implement the memory-based buffer.(KIP-81)
Consumer Poll Changes
We will remove the metadata logic from the consumer.poll(), so that the execution of the poll loop is much simplified. It mainly handles:
- fetches
- callback invocation
- errors
ConsumerCoordinator and AbstractCoordinator
- New states will be introduced (see Rebalance States section above). The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.
- It will still require a fetch event to poll heartbeat. As only polling thread issues fetch events, and we want to respect the existing implementation.
- Timeouts and timers will be reevaluated and possibly removed.
- while loops will be reevaluated and possibly thrown out. In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.
Timeout Policy
Please see Java client Consumer timeouts for more detail on timeouts.
The following description provides more clarity on the states that make up the CGMSM:
NEW
NEW
is the initial state for a CGMSM upon its creation. The Consumer
will remain in this state until the next pass of the background thread loop.
JOINING
A state of JOINING
signifies that a Consumer wants to join a consumer group. On the next pass of the background thread, the Consumer
will enter this state to begin communicating with the Kafka broker node that was elected as the group coordinator. A ConsumerGroupHeartbeatRequest
will be sent to the coordinator with specific values in the request:
MemberId
is set tonull
MemberEpoch
is set to the hard-coded value of0
Since this is the first request to the coordinator, the CGMSM will include a ConsumerGroupHeartbeatRequest
with all conditional values present. This includes setting TopicPartitions
to null
since there are no assigned partitions in this state.
Once the initial ConsumerGroupHeartbeatResponse
is received successfully, the CGMSM will update its local MemberId
and MemberEpoch
based on the returned data. It will then transition to the JOINED
state.
JOINED
The JOINED
state simply indicates that the Consumer
instance is known to the coordinator as a member of the group. It does not necessarily imply that it has been assigned any partitions. While in the JOINED
state the CGMSM will periodically send requests to the coordinator at the needed cadence in order to maintain membership.
The CGMSM should transition back to the JOINING
state if the ConsumerGroupHeartbeatResponse
has an error of UNKNOWN_MEMBER_ID
or FENCED_MEMBER_EPOCH
. If either of those errors occur, the CGMSM will clear its "assigned" partition set (without any revocation), and transition to the JOINING
set so that it rejoins the group with the same MemberId
and the MemberEpoch
of 0.
The CGMSM will transition into the ASSIGNING
state when the ConsumerGroupHeartbeatResponse
contains a non-null
value for Assignment
.
ASSIGNING
The ASSIGNING
state is entered with the intention that the CGMSM will need to perform the assignment reconciliation process. As is done in the JOINED
state, the CGMSM will continue to communicate with the coordinator via the heartbeat mechanism to maintain its membership.
The first action that is performed in this state is to update the CGMSM's value for the member epoch as provided in the ConsumerGroupHeartbeatResponse
.
Next, the CGMSM performs a comparison between its current the assignment and the value of Assignment
contained in the ConsumerGroupHeartbeatResponse
. If the two assignments are equal, the CGMSM has reconciled the assignment successful and will transition back to the JOINED
state. If they are not equal, the reconciliation process begins.
Partition revocation involves:
- Removing the partitions from the CGMSM's "assigned" set
- Commits the offsets for the revoked partitions
- Invokes ConsumerRebalanceListener.onPartitionsRevoked()
Partition assignment includes:
- Adding the partitions to the CGMSM's "assigned" set
- Invokes ConsumerPartitionAssignor.onAssignment(), if one is set
- Invokes ConsumerRebalanceListener.onPartitionsAssigned()
Questions
- Do we need to heartbeat between revocation and assignment?
- Do we want to split up
ASSIGNING
into separate statesREVOKING
andASSIGNING
?
TERMINATING
TBD
TERMINATED
TBD
Consumer API Internal Changes
Poll
The users are required to invoke poll to:
- Trigger auto-commit
- Poll exceptions: process or raise it to the user
- Poll fetches
- Poll callback invocation trigger to trigger the rebalance listeners.
CommitSync
- The polling thread send a commit event. The commit event has a completable future.
- Wait for the completable future to finish, so that we can make this a blocking API
Assign
- If we are assigning nothing, trigger unsubscribe()
- clear the fetcher buffer
- send a commit event if autocommit is enabled
- send metadata update
Subscribe
- If subscribing to nothing, trigger unsubscribe()
- clear the fetcher buffer
- subscribes
- send metadata update
Unsubscribe
- Send a leave group event
- unsubscribe from the topics
Major Changes
Fetcher
We will break the current fetcher into three parts to accommodate the asynchronous design, i.e., we need the background thread to send fetches autonomously and the polling thread to collect fetches when these fetches become available. We will have 3 separate classes here:
- FetchSender: Responsible for sending fetches in the background thread
- FetchHandler: Sitting in the polling thread's poll loop, processing the fetch response from the fetch event.
- FetchBuffer: This is the CompletedFetches in the old implementation. The use case prevents the FetchSender from sending too many fetches and causing memory issues. This will be removed once we implement the memory-based buffer.(KIP-81)
Consumer Poll Changes
We will remove the metadata logic from the consumer.poll(), so that the execution of the poll loop is much simplified. It mainly handles:
- fetches
- callback invocation
- errors
ConsumerCoordinator and AbstractCoordinator
- New states will be introduced (see Rebalance States section above). The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.
- It will still require a fetch event to poll heartbeat. As only polling thread issues fetch events, and we want to respect the existing implementation.
- Timeouts and timers will be reevaluated and possibly removed.
- while loops will be reevaluated and possibly thrown out. In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.
Timeout Policy
Consumer.poll() - user provide timeout
Coordinator rediscovery backoff: retry.backoff.ms
Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should use request.timeout.ms. And re-attempt in the next loop if failed
CommitOffsetSync: user provided
Rebalance State Timeout: maybe using the request timeout
Is there a better way to configure session interval and heartbeat interval?
Compatibility
The new consumer should be backward compatible.
...