Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Code complexity: Patches and hotfixes in the past years have heavily impacted the readability of the code.  The complex code path and intertwined logic make the code difficult to modify and comprehend. For example, coordinator communication can happen in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers. 

  • Complex Coordinator Communication: Coordinator communication happens at both threads in the current design, and it has caused race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.

  • Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously with the poll. Firstly, it simplifies the design and logic because the timing of the network request is more predictable. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalancing process.

Scope

  • Define the responsibilities of the polling thread and the background thread
  • Define the communication interface and protocol between the two threads
  • Define a Network layer over the NetworkClient, including how different requests are sent and handled
    • Define RequestManagers
    • Define RequestStates to handle retry backoff
  • Redesign SubscriptionState
  • Deprecate HeartbeatThread and move it to the background thread
  • Define rebalance flow, including how JoinGroup and SyncGroup request is sent and how callbacks are triggered
  • Refactor the KafkaConsumer API innards

  • Address issues in these Jira tickets

...

The background running thread in the current implementation (see KIP-62).

Channel

Communication The communication interface between two threads. One channel delivers messages from the polling thread to the background thread. The other channel delivers messages submitted by the background thread to the polling thread.

...

  • CB: Callbacks registered and invoked on the polling thread
  • rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.

Important Components

Polling thread and its lifecycle

The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :

  1. The user invokes assign(), the subscriptionState is altered.
  2. The subscription state changes are sent to the background thread via the BackgroundEventQueue.
  3. The user then invokes poll() in a loop.
  4. During the poll, the polling thread sends a fetch request to the background thread.
  5. During the poll, the polling thread polls fetch results from the BackgroundEventQueue. It deserializes the poll results and returns the result to the user.
  6. The user processes the results and invokes commitSync().
  7. The client thread sends an OffsetCommitApplicationEvent to the background thread. As this is a blocking operation, the method returns when the background thread completes the commit.

Background thread and its lifecycle

The background runs a loop that periodically checks the ApplicationEventQueue, and drains and processes the events. On the high level, the lifecycle of the background thread can be summarized like thisas such:

  1. The application starts up the Consumer, the Consumer creates an EventHandler and starts up the background thread.
  2. The background thread enters the loop and starts polling the ApplicationEventQueue.
    1. Events will be sent to the corresponding RequestManager.  For example, a commit event is sent to the OffsetCommitRequestManager.
  3. The background thread invokes poll on each RequestManager
    1. if the RequestManager might return returns a result, we enqueue it to the NetworkClientthe NetworkClientDelegate
    2. if not, it returns Long.MAX or the time it needs to be polled again.
  4. Poll the NetworkClient NetworkClientDelegate to ensure the request requests are sent.

Network Layers

We are deprecating the current ConsumerNetworkClient because:

  1. The lockings are unnecessary in the new design because everything is on a single thread.
  2. Several Some irrelevant features are irrelevant to this design, such as unsent.

We are introducing a wrapper over NetworkClient that helps the background thread send out requests, the NetworkClientDelegate, to help to coordinate the request send.

Request Manager

The request manager will control the timing of when a request needs to be sent out and construct a request.  Therefore, we will create a concrete implementation per type of request. These requests are:

...

  1. subscribe() invoked on the client thread. A subscription event was sent to the background thread
  2. The background thread drains the queue and updates the GroupState to PREPARE.
  3. HeartbeatRequestManager is polled. It checks the GroupState and determine determines it is time to send the heartbeat
  4. ConsumerGroupHeartbeatResponse received. Updated the GroupState to ASSIGN.
  5. PartitionAssignmentManager is polled, and realize the GroupState is in ASSIGN. Trigger assignment computation:
    1. Serverside assignment: no API will be invoke and assignment will be made
    2. Client side assignment:
      1. ConsumerGroupPrepareAssignment API will be invoked
      2. Leader receives the response and invoke ConsumerGroupInstallAssignment API after computing the assignment
    3. Old protocol: [ommitted here]
      1. JoinGroup ...
      2. SyncGroup ...
  6. Once the assignment is computed, send an event to the client thread to invoke the rebalance callback.
  7. Callback triggered; notify the background thread.
  8. PartitionAssignmentManager is polled and receives the acknowledgment from the client thread. Transition to Complete.
  9. [something needs to happen here]
  10. Transition the GroupState to Stable.

...