Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code.  Firstly, it makes bug fixes increasingly difficult, and secondly, it makes the code hard to comprehend.  For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example.  Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers. 

  • Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has caused some race conditions such as KAFKA-13563.  We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.

  • Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to the poll.  Firstly, it simplifies the polling thread design, as it essentially only needs to handle fetch requests.  Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalance process.

Scope

  • Deprecate HeartbeatThread

    • Remove the heartbeat thread.
    • Move the heartbeat logic to the background thread.
  • Refactor the Coordinator classes

    • Revisit timers and loops
    • Remove some blocking methods, such as commitOffsetSync.  The blocking commit should be handled by the polling thread, waiting for the background thread to complete the future.
    • The coordinator will be polled by the background thread loop.
    • Rebalance state modification: We will add a few states to ensure the rebalance callbacks are executed in the correct sequence and time.
  • Refactor the KafkaConsumer API

    • It will send Events to the background thread if network communication is required.
    • Remove dependency on the fetcher, coordinator, and networkClient.
  • Events and communication channels.

    • We will use two channels the facilitate the two-way communication
  • Address issues in these Jira tickets

...

Terms

Polling Thread

The client thread, also known as the polling thread, is where the thread user invokes the poll() operation.

...

In the schematic below, the main components are:

  • Polling thread

...

  • : handles all of the API requests and

...

  • callback executions.

  • Background thread

...

  • : handles network communication such as heartbeat

...

  • , coordinator requests, and rebalance flow execution.

  • Communication channels

...

  • : which are responsible for:

    • sending events to the background thread

    • sending events to the polling thread

A note on the subscriptionState: Its reference will be shared by polling and background threads.

...

Background thread and its lifecycle

The coordinator is automatically discovered (when requested) in our implementation.  Therefore, discovery process piggybacks on the background thread loop; therefore, any disconnect or connection request is handled by the state machine.  Here we use a state machine to represent the different states of the coordinator connection and perform rediscovery in case of disconnection, as long as the background thread isn’t terminated.  Here, the background can be in one of the four states: down, initialized, discovering coordinator_discovery (coordinator discovery), and stable.

Background thread and its lifecycle

...

  The background thread needs to fulfill certain requirements in each state to maintain its status.  In particular:

  • down: The coordinator is uninitialized
  • initialized: The background thread completes initialization, and the loop has started.
  • coordinator_discovery: The coordinator is requested but hasn't been connected yet. If there's no in-flight FindCoordinatorRequest, then it will send one.  If one exists, check for the request results.
  • stable: The coordinator is connected.

Image Added

Background thread and its lifecycle

  1. The background thread has not been constructed, so it is down.Upon initialization of the consumer, the coordinator is down

  2. The polling thread starts the background thread; the background thread moves to the initialized state.  Usually happens after new KafkaConsumer().

  3. The background thread loop is running.  Here are the things that are happening:

    1. Poll for the new events (for example, a commit event) from the queue, if anychannel

    2. Check the background thread state (connected to a coordinator or not)state by running the state machine.  Ensure state requirements are fulfilled.

      • If the event requires a coordinator.  Moves the

      coordinator
      • background thread to the coordinator_discovery (coordinator discovery) state.

        • If not, stays initialized and execute the event.

      • Check the coordinator connection. 

        • If the FindCoordinator request hasn’t been completed, stay in the discovery state. 

        • If the request fails, transition to the initialized state. 

        • Otherwise, the coordinator is found

      and transition
        • . Transition to the stable state.

  4. Poll ConsumerCoordinator

  5. Poll networkClient

  6. Go to 3

  7. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

...