Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The application starts up the Consumer, the Consumer creates an EventHandler and starts up the background thread.
  2. The background thread enters the loop and starts polling the ApplicationEventQueue.
    1. Events will be sent to the corresponding RequestManager.  For example, a commit event is sent to the OffsetCommitRequestManager.
  3. The background thread invokes poll on each RequestManager
    1. if the RequestManager returns a result, we enqueue it to the NetworkClientDelegate
    2. if not, it returns Long.MAX or the time it needs to be polled again.
  4. Poll the NetworkClientDelegate to ensure the requests are sent.

Network Layers

We are deprecating the current ConsumerNetworkClient because:

...

We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the request sendrequests.

Request Manager

One of the core considerations of this design is to avoid concurrent access; therefore, we streamline the request processing, such as sending coordinator requests, heartbeat requests, and user API requests (e.g. commit). To achieve this, create a list of request managers that implement the RequestManager interface to handle the creation of the request and compute the timing of the subsequent request.

The RequestManager are polled (they will implement a poll method) sequentially in the background thread loop, and each of created requests is sent to the NetworkClientDelegate to get sent out. Because the request managers are only invoked by the background thread, sequentially, no lock is needed, and the invocation pattern is more predictable and tracable.

The request managers handle the following requestsThe request manager will control the timing of when a request needs to be sent out and construct a request.  Therefore, we will create a concrete implementation per type of request. These requests are:

  1. FindCoordinatorRequest
  2. OffsetCommitRequest
  3. FetchRequest
  4. MetadataRequest
  5. HeartbeatRequest
  6. ListOffsetRequest
  7. JoinGroupRequest
  8. SyncGroupRequest

After KIP-848 is implemented, the request managers also handle the following:

  1. ConsumerGroupHeartbeatRequest
  2. ConsumerGroupPrepareAssignmentRequest
  3. ConsumerGroupInstallAssignmentRequest

RequestFuture and Callback

The current implementation chains callbacks to requestFutures (Kafka internal type).  We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.

EventHandler

EventHandler is the main interface for between the polling thread to interface with and the background thread. It has two main functionspurposes:

  1. Allows polling thread to send events to the background thread
  2. Allows polling thread to poll background thread events

...

Rebalance [WIP]

One of the main reason reasons we are refactoring the KafkaConsumer is to satisfy the requirements of the new rebalance protocol introduced in KIP-848; however, we will make the framework as plugable as possible so that we can adpot the new API, as well as the old consumer protocol.  Here, we focus on the new protocol, and will expand later.aren't quite ready to deprecate the current protocols so they will be supported in the foreable future.

KIP-848 contains two assignment modes, server-side mode, and client-side mode.  Both use the new Heartbeat API, the ConsumerGroupHeartbeat.

The server-side mode is simpler: the assignments are computed by the Group Coordinator, and the clients are only responsible for managing their partition assignments.

The client-side mode and the current protocol are more complicated: It relies on heartbeat and several API invocations to complete the rebalancing process.

To accommodate for different mode of assignment, we want to introduce four components hereThe important components we have here are:

  1. GroupState: keep track of the current state of the group such as Generation, and the rebalance state machine.
  2. HeartbeatRequestManager implements RequestManager: Manages the state of the when to invoke ConsumerGroupHeartbeat API and deteremines when to send out the heartbeatPartitionAssignmentManager:
  3. RebalanceProtocol interface: The assignor modes will implement this interface.
  4. ProtocolRequestManager: Manages the life-cycle of the GroupState state machine and handles rebalance API requests.
  5. JoinGroup and SyncGroup requests (old protocol)
  6. ConsumerGroupPrepareAssignment API and ConsumerGroupInstallAssignment API for the client side assignor protocol
  7. The server side assignor entirely relies on the heartbeat request therefore no API will be made there

Rebalance Flow

New Consumer Group

  1. The user invokes subscribe() invoked on the client thread. SubscriptionState is altered. A subscription state alters event was is sent to the background thread.
  2. The background thread drains processes the queue event and updates the GroupState to PREPARE.
  3. HeartbeatRequestManager is polled. It checks the GroupState and determines it is time to send the heartbeat.
  4. ConsumerGroupHeartbeatResponse received. Updated the GroupState to ASSIGN.
  5. PartitionAssignmentManager is polled, and realize the GroupState is in ASSIGN. Trigger assignment computation:
  6. Serverside assignment: no API will be invoke and assignment will be made
  7. Client side assignment:
    1. ConsumerGroupPrepareAssignment API will be invoked
    2. Leader receives the response and invoke ConsumerGroupInstallAssignment API after computing the assignment
  8. Old protocol: [ommitted here]
  9. JoinGroup ...
  10. SyncGroup ...[We might need another state here]
  11. Once the assignment is computed, send an event to the client thread to invoke the rebalance callback.
  12. Callback triggered; notify the background thread.
  13. PartitionAssignmentManager is polled and receives the acknowledgment from the client thread. Transition to Complete.
  14. [something needs to happen here]
  15. Transition the GroupState to Stable.

...

  • UNJOINED: There's no rebalance. For the simple consumed use case, the GroupState remains in UNJOINED
  • PREPARE: Sending the heartbeat , and await for the response
  • ASSIGN: Assignment updated, client thread side callbacks are triggered, and await for completion
  • COMPLETE: Client thread callback completed and has notify notified the background thread.
  • STABLE: stable group

Consumer API Internal Changes 

Poll

The users are required to invoke poll to:

  1. Trigger auto-commit
  2. Poll the EventHandler, and execute the events accordingly
    1. Callback invocation
    2. Collect fetches
    3. Error handling
    check the fetcher, return if there's dataexceptions: process or raise it to the user
  3. Poll fetches
  4. Poll callback invocation trigger to trigger the rebalance listeners.

CommitSync

  1. The polling thread send a commit event.  The commit event has a completable future.
  2. Wait for the completable future to finish, so that we can make this a blocking API

...

We will break the current fetcher into three separate parts to accommodate the asynchronous design, i.e., we need the background thread to send fetches autonomously and the polling thread to collect fetches when these fetches become available. We will have 3 separate classes here:

...