You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 21 Next »

Public-Facing Changes

  • Update Java Docs

  • API signatures will remain the same

Summary

This document highlights our effort to refactor the threading model of the KafkaConsumer, and the purpose of this refactor is to address issues and shortcomings we’ve encountered in the past years, such as increasing code complexity, lock and race conditions, and the coupling between the client poll and the rebalance progress.

  • Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code.  Firstly, it makes bug fixes increasingly difficult, and secondly, it makes the code hard to comprehend.  For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example.  Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers. 

  • Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has caused some race conditions such as KAFKA-13563.  We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.

  • Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to the poll.  Firstly, it simplifies the polling thread design, as it essentially only needs to handle fetch requests.  Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalance process.

Scope

  • Deprecate HeartbeatThread

    • Remove the heartbeat thread.
    • Move the heartbeat logic to the background thread.
  • Refactor the Coordinator classes

    • Revisit timers and loops
    • Blocking method such as commitOffsetSync will be handled by the poling thread by waiting on the future.  Coordinator should not be blocking.
    • The coordinator will be polled by the background thread loop.
    • Rebalance state modification: We will add a few states to ensure the rebalance callbacks are executed in the correct sequence and time.
  • Refactor the KafkaConsumer API

    • It will send Events to the background thread if network communication is required.
    • Remove dependency on the fetcher, coordinator, and networkClient.
  • Events and communication channels.

    • We will use two channels the facilitate the two-way communication
  • Address issues in these Jira tickets

Terms

Polling Thread

The client thread, also known as the polling thread, is where the thread user invokes the poll() operation.

Background Thread

Specifically, in the context of the new design, we use this generic term to indicate the active process running in the background.  It handles network communication, rebalancing, and heartbeat.

Heartbeat Thread

The background running thread in the current implementation (see KIP-62).

Channel

We have two channels in this design.  One channel delivers messages from the polling thread to the background thread.  The other channel delivers messages submitted by the background thread to the polling thread.

Event

The primary communication medium for the polling thread is to interact with the background thread.

Proposed Design

In the schematic below, the main components are:

  • Polling thread: handles all of the API requests and callback executions.

  • Background thread: handles network communication such as heartbeat, coordinator requests, and rebalance flow execution.

  • Communication channels: which are responsible for:

    • sending events to the background thread

    • sending events to the polling thread

A note on the subscriptionState: Its reference will be shared by polling and background threads, i.e., we will refactor it using a thread-safe data structure or provide explicit locking and synchronization barriers.

Important Components

Background thread and its lifecycle

The coordinator discovery process piggybacks on the background thread loop; therefore, any disconnect or connection request is handled by the state machine.  Here we use a state machine to represent the different states of the coordinator.  Here, the background can be in one of the four states: down, initialized, coordinator_discovery (coordinator discovery), and stable.  The background thread needs to fulfill certain requirements in each state to maintain its status.  In particular:

  • down: The coordinator is uninitialized
  • initialized: The background thread completes initialization, and the loop has started.
  • coordinator_discovery: The coordinator is requested but hasn't been connected yet. If there's no in-flight FindCoordinatorRequest, then it will send one.  If one exists, check for the request results.
  • stable: The coordinator is connected.

Background thread and its lifecycle

  1. The background thread has not been constructed, so it is down.

  2. The polling thread starts the background thread; the background thread moves to the initialized state.  Usually happens after new KafkaConsumer().

  3. The background thread loop is running.  Here are the things that are happening:

    1. Poll for the new events (for example, a commit event) from the channel

    2. Check the background thread state by running the state machine.  Ensure state requirements are fulfilled.

      • If the event requires a coordinator.  Moves the background thread to the coordinator_discovery (coordinator discovery) state.

        • If not, stays initialized and execute the event.

      • Check the coordinator connection. 

        • If the FindCoordinator request hasn’t been completed, stay in the discovery state. 

        • If the request fails, transition to the initialized state. 

        • Otherwise, the coordinator is found. Transition to the stable state.

  4. Poll ConsumerCoordinator

  5. Poll networkClient

  6. Loop back to 3 if not closed.

  7. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

Consumer Poll Changes

Currently, the consumer.poll() does two things: poll coordinator for assignment update, auto-commit and rebalances, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data and polling the channel for events such as rebalancing and error:

  1. Poll the ConsumerQueue for events

    1. Handle exceptions

    2. Handle callback execution

  2. Send out fetch requests

  3. Returning fetch results

ConsumerCoordinator and AbstractCoordinator

We will use the existing implementation but with a few changes

  • Refactor the HeartbeatThread out of the existing implementation.  Therefore, we won’t be starting the heartbeat thread but having the send heartbeat mechanism piggyback onto the background thread.

  • Right now, the coordinator states Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable.  Because of the complexity of the rebalance callback execution in the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add the “REVOKE_PARTITION” state to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to take onPartitionAssigned

  • These “loop until done or timeout” mechanisms will piggyback on the background thread loop and take advantage of the state machine to advance the progress:

    • Background thread will automatically re-discover the coordinator if disconnected, using the state machine

    • We are already doing networkclinet.poll() in the background thread loop

Rebalance States

The existing coordinator implementation already has a state machine in place.  We will add four states to facilitate the callback execution.

  1. After onJoinPrepare, transition to the REVOKING_PARTITIONS state.  It will do a few things:

    1. Send an event to the polling thread to execute the event

    2. Wait for the partition revoked event, and advance the state to PARTITIONS_REVOKED

  2. In the onJoinComplete:

    1. We will first exist COMPLETING_JOIN state

    2. Enter ASSIGNING_PARTITIONS, and send a partition assignment event.  Return.

    3. Wait for partition assignment completion from the polling thread.  Advance to PARTITIONS_ASSIGNED

Channel and Events

We use a blocking queue to send API events from the polling thread to the background thread. 

ServerEventQueue
// Channel used to send events to the background thread

private BlockingQueue<ServerEvent> queue;

abstract public class ServerEvent {
   private final ServerEventType eventType;
}

enum ServerEventType {
   FETCH,
   COMMIT,
   METADATA_UPDATE,
   CLOSE,
   ...
}

ConsumerEventQueue
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ConsumerEvent> queue;

abstract public class ConsumerEvent {
   private final ConsumerEventType eventType;
}

enum ConsumerEventType {
   ERROR,
   REVOKE_PARTITION,
   ...
}

Wakeup() and WakeupException

TBD.  I’m not sure how WakeupException plays out in the new design.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll().  Maybe we should use request.timeout.ms.  And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Is there a better way to configure session interval and heartbeat interval?

Event Data Models

ConsumerEvent
NameTypeDescription
typeConsumerEventTypeType of event
requiredCoordinatorboolIndicate if the event uses Coordinator
ServerEvent
typeServerEventTypeType of event

Test Plan

We will first write and use the existing unit tests and integration tests.

We need to ensure the timing of the critical events are happening in the correct sequence.  For example: We need to first discover the coordinator, second, commit the offset while pausing the partition for being fetched, revoke the partition, and then continue onto rest of the rebalance process.

We will also ensure the heartbeat interval, and poll interval are respected.

We also need to ensure there’s no event lost, and the event should happen in the correct sequence.

Please review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.

Compatibility

The new consumer should be backward compatible.

Alternative Proposal

Fetcher

  • Should we consider a larger or configurable prefetch buffer?  Ideally, we can tune this param to improve the performance.

SubscriptionState

  • Shared: For implementation simplicity, the subscriptionState will be shared between the background and polling thread

  • Non-shared: We can make a copy of the subscriptionState in the background thread, and 1.  update the state, and 2.  check for consistency during the poll

Others

  • Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take().  Each take() performs similar action as poll().

User Adoption

The refactor should have (almost) no regression or breaking changes upon merge.  So user should be able to continue using the new client.

Release Plan

  • Support code will live a long side with the current code
    • Background thread
    • A new coordinator implementation, AsyncConsumerCoordinator for example.
    • Events and event executors
  • We will create a new KafkaConsumer class first, then have it override the existing one once reach stability
  • No labels