You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 35 Next »

Public-Facing Changes

  • We will revise the existing Java Docs.

  • API signatures will remain the same.

  • We will try to maintain the existing contracts.

Summary

This document highlights our effort to refactor the threading model of the KafkaConsumer, and the purpose of this refactor is to address issues and shortcomings we've encountered in the past years, such as increasing code complexity, lock and race conditions, and the coupling between the client poll and the rebalance progress.

  • Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code.  The complex code path and intertwined logic make the code difficult to modify and comprehend. For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers. 

  • Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has caused race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.

  • Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously with the poll. Firstly, it simplifies the design and logic because the timing of the network request is more predictable. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalance process.

Scope

  • Deprecate HeartbeatThread

    • Remove the heartbeat thread.
    • Move the heartbeat logic to the background thread.
  • Implement a background thread
  • Refactor the Coordinator classes

    • Revisit timers and loops
    • Blocking methods such as commitOffsetSync will be handled by the poling thread by waiting on the future's completion. The coordinator should not be blocking.
    • The background thread loop will poll the coordinator.
    • We will modify the existing rebalance states, specifically for the callback execution.
  • Refactor the Fetcher Class
    • We send fetches in the background and collect, and process fetches on the polling thread
    • We also need a thread-safe buffer (the completedFetches queue) accessible by both threads.
  • Refactor the KafkaConsumer API

    • It will be completely event driven. The polling thread will 1. poll for background events, and 2. send application events.
    • Remove dependency on the fetcher, coordinator, and networkClient.
  • Events and communication channels.

    • We will use two BlockingQueues for two-way communication between the background and polling threads.
  • Address issues in these Jira tickets

Terms

Polling Thread

The client thread, also known as the polling thread, is where the thread user invokes the poll() operation.

Background Thread

Specifically, we use this generic term in the new design context to indicate the active process running in the background. It handles network communication, rebalancing, and heartbeat.

Heartbeat Thread

The background running thread in the current implementation (see KIP-62).

Channel

We have two channels in this design. One channel delivers messages from the polling thread to the background thread. The other channel delivers messages submitted by the background thread to the polling thread.

Event

The primary communication medium for the polling thread is to interact with the background thread.

Proposed Design

In the schematic below, the main components are:

  • Polling thread: handles all of the API requests and callback executions.

  • Background thread: handles network communication such as heartbeat, coordinator requests, and rebalance flow execution.

  • Communication channels:

    • ApplicationEventQueue: sending events to the background thread

    • BackgroundEventQueue: sending events to the polling thread

    • Both queues will be interfaced by the EventHandler interface

We will discuss the strategy for handling event results in the following section. In short, we will use the CompletableFuture API to handle the asynchronous operation between the two threads.    

A note on the subscriptionState: Its reference will be shared by polling and background threads, i.e., we will refactor the current implementation to be thread-safe. The reason we are making this data structure an exception are:

  • The object is frequently accessed by both the polling thread and the background thread.
  • The object often requires immediate response; otherwise, the performance will be heavily impacted, and the current behavior will be altered.

Important Components

Background thread and its lifecycle

Maintaining and managing the coordinator connection is crucial because many operations depend on it. In the current implementation, we send FindCoordinator requests when one is needed and is unavailable. This makes the request futures hard to manage; therefore, in the new implementation, we want to centralize the coordinator connection and use a state machine to facilitate the discovery process.

The connection states are represented by: down, initialized, coordinator_discovery (coordinator discovery), and stable.

  • down: The coordinator is uninitialized
  • initialized: The background thread completes initialization, and the loop has started.
  • coordinator_discovery: If there's no in-flight FindCoordinatorRequest, then it will send one. If one exists, check and wait for the requested results.
  • stable: The coordinator connection established.

The coordinator discovery process piggybacks on the background thread event loop.  The event loop polls handles the inflight event and may attempt to send the FindCoordinator request, i.e., if we don't need a coordinator, the background thread will stay in the initialized state.  If an event requires a coordinator, we will then move to the coordinator_discovery state and wait for the response.

Background thread and its lifecycle

  1. The background thread has not been constructed, so it is in the down state.

  2. The polling thread starts the background; the background thread finishes initialization and then moves to the initialized state. Usually, this happens after new KafkaConsumer().

  3. The background thread loop performs the following tasks:

    1. Check if there is an in-flight event. If not, poll for the new events from the channel.

    2. Run the state machine, and here are the following scenario:

      • The event does not require a coordinator.  Execute the event and move on.
      • The event requires a coordinator, but it is currently disconnected.  Move the state to coordinator_discovery.
      • The background thread is currently in the coorinator_discovery state; check the connections.
      • The background thread is in a stable state.
        • Poll the Coordinator.
        • Execute the event.
  4. Poll the networkClient.

  5. Backoff for retryBackoffMs milliseconds.

Rebalance States

To coordinate the polling thread and background thread  to execute the callbacks and acknowledge the completion, we need to modify the existing coordinator states.

  1. We split the PREPARING_REBALANCE into PREPARE_REVOCATION, and PARTITION_REVOKED, PREPARING_REBALANCE

    1. PREPARE_REVOCATION: Pause partitions that will be revoked
    2. REVOKING_PARTITION: Await for the onPartitionRevoked to be completed.

    3. PARTITION_REVOKED: Update the subscription.  Autocommit.
  2. And ASSIGN_PARTITION state will be added before STABLE

    1. ASSIGNING_PARTITIONS: Await for onPartitionAssign to be completed.

    2. Upon onPartitionAssign's completion, we will move the state to STABLE.

After the proposed state transition is as such:

Starting from UNJOINED: UNJOINED → PREPARING_REBALANCE → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE

(EAGER) Starting from STABLE: STABLE → PREPARE_REVOCATION → REVOKING_PARTITION → PARTITION_REVOKED → PREPARING_REBALANCE → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE

(COOPERATIVE) STABLE → PREPARE_REVOCATION → REVOKING_PARTITION → PARTITION_REVOKED → COMPLETING_REBALANCE → ASSIGNING_PARTITIONS → STABLE → //rejoin

EventHandler and Events

Here we define two types of events:

  1. ApplicationEvent: application side events that will be sent to the background thread
  2. BackgroundEvent: background thread events that will be sent to the application

We use a blocking queue to send API events from the polling thread to the background thread. We will abstract the communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.

EventHandler
interface EventHandler {
public ApplicationEvent poll();
public void add(RequestEvent event);
}
ApplicationEventQueue and ApplicationEvent
// Channel used to send events to the background thread

private BlockingQueue<ApplicationEvent> queue;

abstract public class ApplicationEvent {
   private final ApplicationEventType eventType;
}

enum ApplicationEventType {
   COMMIT,
   ACK_PARTITION_REVOKED,
ACK_PARTITION_ASSIGNED,
UPDATE_METADATA, }
BackgroundEventQueue and BackgroundEvent
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<BackgroundEvent> queue;

abstract public class BackgroundEvent {
   private final BackgroundEventType eventType;
}

enum ResponseEventType {
   ERROR,
   REVOKE_PARTITIONS,
   ASSIGN_PARTITIONS,
FETCH_RESPONSE, }

Consumer API Internal Changes 

Poll

  1. Poll the EventHandler, and execute the events accordingly
    1. Callback invocation
    2. Collect fetches
    3. Error handling
  2. check the fetcher, return if there's data

CommitSync

  1. The polling thread send a commit event.  The commit event has a completable future.
  2. Wait for the completable future to finish, so that we can make this a blocking API

Major Changes

Consumer Poll Changes

consumer.poll() will mainly be a loop polling for events from the event handler.  Here are the important events:

  1. Fetch Data: Return immediately
  2. Callback Invocation: Invoke the callback and subsequently send an acknowledgment event to the background thread to advance the state machine (see background thread section).
  3. Error: Handle the error upon receiving it.

ConsumerCoordinator and AbstractCoordinator

  • New states will be introduced (see Rebalance States section above).  The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
  • Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.

    • It will still require a fetch event to poll heartbeat.  As only polling thread issues fetch events, and we want to respect the existing implementation.
  • Timeouts and timers will be reevaluated and possibly removed.
  • while loops will be reevaluated and possibly thrown out.  In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.

Wakeup() and WakeupException

We will have to reimplement wakeup()

wakeup()

In the new design, wakeup will interrupt the blocking polling loop in the polling thread.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Is there a better way to configure session interval and heartbeat interval?

Test Plan

We will first write and use the existing unit tests and integration tests.

We need to ensure the timing of the critical events are happening in the correct sequence. For example: We need to first discover the coordinator, second, commit the offset while pausing the partition for being fetched, revoke the partition, and then continue onto rest of the rebalance process.

We will also ensure the heartbeat interval, and poll interval are respected.

We also need to ensure there's no event lost, and the event should happen in the correct sequence.

Please review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.

Compatibility

The new consumer should be backward compatible.

Alternative Proposals

Fetcher

  • Should some of the fetcher configuration be dynamic

  • Configurable prefetch buffer

SubscriptionState

  • Non-shared: We can make a copy of the subscriptionState in the background thread, and use event to drive the synchronization.

    • There could be out of sync issues, which can subsequently causes in correct fetching, etc..

API Changes

  • Poll returns CompletableFuture<ConsumerRecord<K,V>>

User Adoption

The refactor should have (almost) no regression or breaking changes upon merge. So user should be able to continue using the new client.

Release Plan

  • Support code will exist in parallel from the current code.  The support code are:
    • Background thread
    • A new coordinator implementation, AsyncConsumerCoordinator, for example.
    • Events and event executors
  • We will create a new KafkaConsumer class first, then have it override the existing one once reach stability
  • No labels