Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Public-Facing Changes

  • Update We will revise the existing Java Docs.

  • API signatures will remain the same.

  • We will try to maintain the existing contracts.

Summary

This document highlights our effort to refactor the threading model of the KafkaConsumer, and the purpose of this refactor is to address issues and shortcomings we've encountered in the past years, such as increasing code complexity, lock and race conditions, and the coupling between the client poll and the rebalance progress.

  • Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code.  The complex code path and intertwined logic make the code difficult to modify and comprehend. For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers. 

  • Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has , caused some race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.

  • Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to with the poll. Firstly, it simplifies the pd design, as it essentially only needs to handle fetch requestsdesign and logic because the timing of the network request is more predictable. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalance process.

...

  • Deprecate HeartbeatThread

    • Remove the heartbeat thread.
    • Move the heartbeat logic to the background thread.
  • Implement a background thread
  • Refactor the Coordinator classes

    • Revisit timers and loops
    • Blocking methods such as commitOffsetSync will be handled by the poling thread by waiting on the future's completion. The coordinator should not be blocking.
    • The background thread loop will poll the coordinator.
    • We will modify the existing rebalance states, specifically for the callback execution.
  • Refactor the Fetcher Class
    • We send fetches in the background and collect, and process fetches on the polling thread
    • We also need a thread-safe buffer (the completedFetches queue) accessible by both threads.
  • Refactor the KafkaConsumer API

    • It will send Events to the background thread if network communication is requiredbe completely event driven. The polling thread will 1. poll for background events, and 2. send application events.
    • Remove dependency on the fetcher, coordinator, and networkClient.
  • Minor Refactor:
    • Fetcher:
      • sendFetches will be performed in the background thread.
      • network client, coordinator will be removed from the class
  • Events and communication channels.

    • We will use two BlockingQueues for the two-way communication between the background thread and the polling threadthreads.
  • Address issues in these Jira tickets

...

Background Thread

Specifically, we use this generic term in the new design context , we use this generic term to indicate the active process running in the background. It handles network communication, rebalancing, and heartbeat.

...

  • The object is frequently accessed by both the polling thread and the background thread.
  • The object often requires instaneous immediate response; otherwise, the performance will be heavily impactedAlso, please review the rejected proposals section below. I talked about a few ideas that we've had.and the current behavior will be altered.

Important Components

Background thread and its lifecycle

We Maintaining and managing the coordinator connection is crucial because many operations depend on it. In the current implementation, we send FindCoordinator requests when one is needed and is unavailable. This makes the request futures hard to manage; therefore, in the new implementation, we want to centralize the coordinator connection and use a state machine to facilitate the discovery process.

The connection states are represented by represent the different states of the coordinator connection; these are: down, initialized, coordinator_discovery (coordinator discovery), and stable.

  • down: The coordinator is uninitialized
  • initialized: The background thread completes initialization, and the loop has started.
  • coordinator_discovery: The coordinator is requested but hasn't been connected yet. If there's no in-flight FindCoordinatorRequest, then it will send one. If one exists, check and wait for the request requested results.
  • stable: The coordinator is connectedconnection established.

The coordinator discovery process piggybacks on an event loop running on the background thread event loop.  The event loop polls events from the queue, and handles coordinator connection on demandhandles the inflight event and may attempt to send the FindCoordinator request, i.e., if we don't need a coordinator, the background thread will stay in the initialized state.  If an event requires a coordinator, we will then move to the coordinator_discovery state and wait for the response to come back.

Background thread and its lifecycle

  1. The background thread has not been constructed, so it is in the down state.

  2. The polling thread starts the background; the background thread finishes initialization and then moves to the initialized state. Usually, this happens after new KafkaConsumer().

  3. The background thread loop performs the following tasks:

    1. Check if there is an in-flight event. If not, poll for the new events from the channel.

    2. Run the state machine, and here are the following scenario:

      • The event does not require a coordinator.  Execute the event and move on.
      • The event requires a coordinator, but it is currently disconnected.  Move the state to coordinator_discovery.
      • The background thread is currently in in the coorinator_discovery state; continue to loop and wait for the FindCoordinator responsecheck the connections.
      • The background thread is in the a stable state.
        • Poll the Coordinator.
        • Execute the event.
  4. Poll the networkClient.

  5. Backoff for retryBackoffMs milliseconds.

Rebalance States

We To coordinate the polling thread and background thread  to execute the callbacks and acknowledge the completion, we need to modify the existing rebalance states to aid the callback executioncoordinator states.

  1. We split the PREPARING_REBALANCE into PREPARE_REVOCATION, and PARTITION_REVOKED, PREPARING_REBALANCE

    1. PREPARE_REVOCATION: Pause partitions that will be revoked
    2. REVOKING_PARTITION: Await for the onPartitionRevoked to be compeltedcompleted.

    3. PARTITION_REVOKED: Update the subscription.  Autocommit.
  2. And ASSIGN_PARTITION state will be added before STABLE

    1. ASSIGNING_PARTITIONS: Await for onPartitionAssign to be completed.

    2. Upon onPartitionAssign's completion, we will move the state to STABLE.

After the proposed state transition is as such:

...

(COOPERATIVE) STABLE → PREPARE_REVOCATION → REVOKING_PARTITION → PARTITION_REVOKED → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE → //rejoin

...

EventHandler and Events

Here we define two types of events:

  1. ApplicationEvent: application side events that will be sent to the background thread
  2. BackgroundEvent: background thread events that will be sent to the application

We use a blocking queue to send API events from the polling thread to the background thread.   The queues We will be abstract into an eventHandler, so we don't need to operate directly with the queuethe communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.

EventHandler
interface EventHandler {
public ResponseEventApplicationEvent poll();
public void add(RequestEvent event);
}

...

ApplicationEventQueue and ApplicationEvent
// Channel used to send events to the background thread

private BlockingQueue<RequestEvent>BlockingQueue<ApplicationEvent> queue;

abstract public class RequestEventApplicationEvent {
   private final RequestEventTypeApplicationEventType eventType;
}

enum RequestEventTypeApplicationEventType {
   COMMIT,
   METADATA_UPDATE,
   ...ACK_PARTITION_REVOKED,
ACK_PARTITION_ASSIGNED,
UPDATE_METADATA, }

...

BackgroundEventQueue and BackgroundEvent
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ResponseEvent>BlockingQueue<BackgroundEvent> queue;

abstract public class ResponseEventBackgroundEvent {
   private final ResponseEventTypeBackgroundEventType eventType;
}

enum ResponseEventType {
   ERROR,
   REVOKE_PARTITIONS,
   ASSIGN_PARTITIONS,
FETCH_RESPONSE, }

Consumer API Internal Changes 

Poll

  1. Poll the EventHandler, and execute the events accordingly
    1. Callback invocation
    2. Collect fetches
    3. Error handling
  2. check the fetcher, return if there's data

CommitSync

  1. The polling thread send a commit event.  The commit event has a completable future.
  2. Wait for the completable future to finish, so that we can make this a blocking API

Major Changes

Consumer Poll Changes

consumer.poll() will mainly be a loop polling for events from the event handler.  Here are the important events:

...

We will have to reimplement wakeup() and WakeupException.

wakeup()

In the new design, wakeup will interrupt the blocking polling loop in the polling thread.

WakeupException

The exception will be thrown by the polling thread when the loop is interrupted.

Timeout Policy

Consumer.poll() - user provide timeout

...

Is there a better way to configure session interval and heartbeat interval?

Consumer API Internal Changes 

Assign

  1. The polling thread updates the subscriptionState.
  2. Create an ASSIGN event, and queue up the event to the background thread.
  3. Background thread receives the event and executes the event.
    1. send a commitOffset
    2. Update metadata

Unsubscribe

  1. The polling thread removes the subscriptions from the subscriptionState
  2. executes the onPartitionRevoke callback
  3. Send background thread an LEAVE_GROUP event
  4. Returns
  5. The background thread:
    1. Initiate LeaveGroup

Subscribe

  1. The polling thread updates the subscriptionState.
  2. Create a SUBSCRIBE event, and send it to the background thread.
  3. Background thread executes the event:
    1. Update metadata

Poll

  1. Poll the EventHandler, and execute the events accordingly
  2. Execute the interceptor.onConsumer and return the data.
  3. Background thread execute the FETCH event:
    1. Autocommits // I think we can move the autocommit to the background thread
    2. issue fetch requests

Event Data Models

...

Test Plan

We will first write and use the existing unit tests and integration tests.

...