Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Polling thread: handles all of the API requests and callback executions.

  • Background thread: handles network communication such as heartbeat, coordinator requests, and rebalance flow execution.

  • Event Handler: An A communication interface between the two threads to handle event transmissionpolling and background thread.

    • ApplicationEventQueue: sending events to the background thread

    • BackgroundEventQueue: sending events to the polling thread

    • Both queues will be interfaced by the EventHandler interface

...

  • CB: Callbacks registered and invoked on the polling thread: commit callback, rebalance callback.
  • rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.

...

  • subscriptionState design is still under discussion

Design

Polling thread and its lifecycle

...

  1. The application starts up the Consumer, the Consumer creates an EventHandler, and starts up the background thread.
  2. The background thread enters the loop and starts polling the ApplicationEventQueue.
    1. Events will be sent to the corresponding RequestManager.  For example, a commit event is sent to the OffsetCommitRequestManager.
  3. The background thread invokes poll on polls each RequestManagerif the . If the RequestManager returns a result, we enqueue it to the NetworkClientDelegateif not, it returns Long.MAX or the time it needs to be polled again.
  4. Poll the NetworkClientDelegate to ensure the requests are sent.

Network Layers

Image Added

We are deprecating the current ConsumerNetworkClient because:

...

We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the requests.

  • All requests are first enqueued into the unsentRequests queue
  • Polling the NetworkClient will result in sending the requests to the queue.

Request Manager

One of the core considerations of this design is to avoid concurrent access; therefore, we streamline the request processing, such as sending coordinator requests, heartbeat requests, and user API requests (e.g. commit). To achieve this, create a list of request managers that implement the RequestManager interface to handle the creation of the request and compute the timing of the subsequent request.

The RequestManager are polled (they will implement a poll method) sequentially in the background thread loop, and each of created requests is sent to the NetworkClientDelegate to get sent out. Because the request managers are only invoked by the background thread, sequentially, no lock is needed, and the invocation pattern is more predictable and tracable.Kafka consumer tasks are tight to the broker requests and responses. In the new implementation, we took a more modular approach to create request managers for different tasks and have the background thread to poll these request managers to see if any requests need to be send. Once a request is returned by the poll, the background thread will enqueu it to the network client to be sent out.

The request managers handle the following requests

  1. FindCoordinatorRequest
  2. OffsetCommitRequest
  3. FetchRequest
  4. MetadataRequest
  5. HeartbeatRequest
  6. ListOffsetRequest
  7. JoinGroupRequest
  8. SyncGroupRequest

After KIP-848 is implemented, the request managers also handle the following:

...

The current implementation chains callbacks to requestFutures (Kafka internal type).  We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.

Events and EventHandler

EventHandler is the main interface between the polling thread and the background thread. It has two main purposes:

...

We use a blocking queue to send API events from the polling thread to the background thread. We will abstract the communication operation using an EventHandler, which allows the caller, i.e. the polling thread, to add and poll the events.

EventHandler
interface EventHandler {
public ApplicationEvent poll();
public void add(RequestEvent event);
}

...

One of the main reasons we are refactoring the KafkaConsumer is to satisfy the requirements of the new rebalance protocol introduced in KIP-848; however, we aren't quite ready to deprecate the current protocols so they will be supported in the foreable future.

KIP-848 contains two assignment modes, server-side mode , and client-side mode.  Both use the new Heartbeat API, the ConsumerGroupHeartbeat.

The server-side mode is simpler: the assignments are computed by the Group Coordinator, and the clients are only responsible for managing their partition assignments.

The client-side mode and the current protocol are more complicated: It relies on heartbeat and several API invocations to complete the rebalancing process.

revoking and assigning the partitions.

If the user chooses to use the client-side assignor, the assignment will be computed by one of the member, and the assignment and revocation is done via the heartbeat as server side mode.

In the new design we will build the following componentsTo accommodate for different mode of assignment, we want to introduce four components here:

  1. GroupState: keep track of the current state of the group, such as as Generation, and the rebalance state machine.
  2. HeartbeatRequestManager: Manages when to invoke A type of request manager that is responsible for calling the ConsumerGroupHeartbeat API
  3. RebalanceProtocol interface: The assignor modes will implement this interface.
  4. ProtocolRequestManager: Manages the life-cycle of the GroupState state machine and handles rebalance API requestsAssignment Manager: Manages partition assignments.

Rebalance Flow

New Consumer Group

...