Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Public-Facing Changes

...

Update Java Docs

...

API signatures will remain exactly the same

...

Table of Contents

Summary

This document highlights our effort to refactor the existing KafkaConsumer, and the purpose of this refactor is current KafkaConsumer. This project aims to address issues and shortcoming we’ve observed shortcomings we've encountered in the past years, such as increasing code complexity from the hotfixes and patchesreadability, undesired concurrency behavior introduced by having both heartbeat thread and polling thread communicate to the coordinator, and the coupling between the client poll and the rebalance progressconcurrency bugs, rebalancing issues, and, lastly, the enable KIP-848.

  • Code complexity: the current rebalance protocol is quite heavy on the client side. And over the years, the patches have gotten increasingly complicated, and the readability and clarity has been impactedPatches and hotfixes in the past years have heavily impacted the readability of the code.  The complex code path and intertwined logic make the code difficult to modify and comprehend. For example, coordinator communication can happen at different places, that makes it difficult to understand the code path.in both the heartbeat thread and the polling thread, which makes it challenging to diagnose the source of the issue when there's a race condition, for example. Also, many parts of the coordinator code have been refactored to execute asynchronously; therefore, we can take the opportunity to refactor the existing loops and timers. 

  • Complex Coordinator Communication: To the previous point, as the poling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and has introduced concurrent issues in the pastCoordinator communication happens at both threads in the current design, and it has caused race conditions such as KAFKA-13563. We want to take the opportunity to move all coordinator communication to the background thread and adopt a more linear design.

  • Asynchronous Background Thread: One of our goal goals here is to make rebalance process to happen asynchronously . And the advantages are, firstlywith the poll. Firstly, it simplifies the polling thread design, as all of the coordinator communication will be moved to the background. Secondly, it prevents polls from blocking the network requests.

Scope

The goal of this refactor, is to move network communication, including heartbeat to the background thread, which allows the consumer to operate in a more asynchronous fashion. In particular, the scope of this refactor will be as such:

  1. Deprecate HeartbeatThread

  2. Refactor the Coordinator classes, i.e. AbstractCoordinator and ConsumerCoordinator, to adapt to the this asynchronous design.

  3. Refactor the KafkaConsumer API internals, to adopt to this asynchronous design.

  4. Introduce events and communication channels, to facilitate the communication between the polling thread and the background thread.

  5. Address issues in these jiras tickets

Definitions

Polling Thread

The main user thread.  It is called polling thread because it is where the thread user invokes the poll() operation.

Background Thread

The thread that is running in the background upon initialization of the KafkaConsumer.

Heartbeat Thread

The background thread in the current implementation (see KIP-62).

Event

The main communication medium for the polling thread to interact with the background thread.

Proposed Design

In the schematic below, the main components are:

  1. Polling thread, which handles all of the API requests and cb execution.

  2. Background thread, which handles network communication such as heartbeat and coordinator requests, and rebalance flow

  3. Communication channels, what are responsible for

    1. sending events to the background thread

    2. sending events to the polling thread

A note on the subscriptionState, here, it will be the only exception in this design, that is sharing references between the polling thread and the background thread.

Important Components

Background thread and its lifecycle

In our implementation, it is important for the background thread to discover coordinator (when needed) automatically. Therefore, we use a state machine to represent the different states of the coordinator connection, and allows rejoin after disconnection, as long as the background thread isn’t terminated. Here, the background will be in 4 different states, down, initialized, discovering (coordinator discovery), and stable

We thread’s lifecycle can be represented this way

  1. Upon initialization of the consumer, the coordinator is down

  2. Polling thread starts the background thread, background thread moves to initialized state

  3. The background thread loop:

    1. Poll for the new events (for example, a commit event) from the queue if any

    2. Check the background thread state (connected to a coordinator or not)

      1. If the event requires coordinator. Moves the coordinator to the discovery (coordinator discovery) state.

      2. If not, execute the event

      3. If the state is currently in discovery state, check coordinator connection. Either move to initialized if the discovery failed. Or stable if coordinator is discovered.

  4. Poll ConsumerCoordinator

  5. Poll networkClient

  6. Go to 3

  7. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

Consumer Poll Changes

Currently, consumer.poll does 2 things: poll coordinator for assignment update, auto commit, and possibly running the rebalance process, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data, and poll the channel for events such as rebalancing and error:

...

  1. Handle exceptions

  2. Handle callback execution

...

Send out fetch requests

...

Returning fetch results

ConsumerCoordinator and AbstractCoordinator

We will use the existing implementation but with a few changes

  • Refactor the HeartbeatThread out of the existing implementation. Therefore, we won’t be starting the heartbeat thread, but having the send heartbeat mechanism piggy back onto the background thread.

  • Right now, the Coordinator state consist of: Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because of the complexity of the rebalance callback and background thread added to the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add “REVOKE_PARTITION” state, to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle onPartitionAssigned

  • joinGroupIfNeeded will need to be refactored

    • (See above) Finer stages of rebalance process, each should be represented by a state, in order for finer control of the rebalance process

    • For example: onJoinPrepare becomes onJoinPrepare, PartitionRevocation, onJoinPrepareCompleted

    • Because of the async nature of the new implementation, polling loops like rejoinNeededOrPending will be removed. These “loop until done or timeout” mechanisms will piggy back on the background thread loop, and use state machine to advance the progress:

      • Background thread will automatically re-discover coordinator if disconnect, using the state machine

      • We are already doing networkclinet.poll() in the background thread loop

Rebalance States

The existing coordinator impl already has a state machine in place. We will add 4 states, to facilitate the callback execution, and modify the existing states.

...

After onJoinPrepare, transition to REVOKING_PARTITIONS state. It will do a few things:

  1. Send an event to the polling thread to execute the event

  2. Wait for partition revoked event, and advance the state to PARTITIONS_REVOKED

  • design and logic because the timing of the network request is more predictable. Secondly, because rebalance occurs in the background thread, the polling thread won't be blocked or blocking the rebalancing process.

Scope

  • Define the responsibilities of the polling thread and the background thread
  • Define the communication interface and protocol between the two threads
  • Define a Network layer over the NetworkClient, including how different requests are sent and handled
    • Define RequestManagers
    • Define RequestStates to handle retry backoff
  • Redesign SubscriptionState
  • Deprecate HeartbeatThread and move it to the background thread
  • Define rebalance flow, including how JoinGroup and SyncGroup request is sent and how callbacks are triggered
  • Refactor the KafkaConsumer API innards

  • Address issues in these Jira tickets

Design

The new consumer is asynchronous and event drive. Former means we are separating the responsibility between the polling thread and the background thread. The latter means the two threads communicate via events. The core components are:

  • Application thread: user thread on which all Consumer API requests and callback handlers are executed.

  • Network thread: client-internal thread on which network I/O related to heartbeat, coordinator, and rebalance is executed.

  • Event handler: A communication interface between the application thread and network thread

  • Application event queue: sends events to the network I/O thread from the application thread

  • Background event queue: sends events from the network thread to the application thread

  • Application event processor

We will discuss the strategy for handling event results in the following section. In short, for async APIs like commitAsync(), CompletableFuture will be used to notify the completion of the events.

Application and Network Thread

The following diagram depicts the interaction between the application thread and the network thread:

draw.io Diagram
borderfalse
diagramNameApplication and network threads
simpleViewerfalse
width1000
linksauto
tbstyletop
diagramDisplayNameApplication and network thread
lboxfalse
diagramWidth923
revision3

The Consumer client object is here depicted in purple. In this design, instead of directly operating on the parameters given to the various APIs (subscribe(), poll(), commit(), etc.), the Consumer implementation packages the parameters as events that are enqueued on the application event queue.

The event handler, shown in green, executes on the network thread. In each loop of the network thread, events are read from the queue and processed.

If the Consumer API is a blocking call, the event passed from the application thread to the network thread will include an embedded CompleteableFuture. After enqueuing the event, the application thread will invoke Future.get(), effectively blocking itself until a result is provided. When the result for the Consumer API call is ready, the network thread will then invoke CompeteableFuture.complete() with the result, allowing the application thread to continue execution.

Submitting Client Requests

The following diagram displays the basic flow between the request managers, unsent request queue, and the NetworkClient

draw.io Diagram
borderfalse
diagramNameRequest managers
simpleViewerfalse
width1200
linksauto
tbstyletop
diagramDisplayNameRequest managers and client requests
lboxfalse
diagramWidth1283
revision5

A request manager represents the logic and stated needed to issue a Kafka RPC requests and handle its response. A request manager may contain logic to handle more than one type of RPC.

In the network thread, we loop over each request manager, effectively asking it for any requests that it needs to send to the cluster. Note that during this step the network request is not sent. Instead, an unsent request object is created which contains the underlying request information. These "unsent requests" are added to a queue of pending unsent client requests. After all of these unsent requests are queued up, then they are forwarded for network I/O via the NetworkClient.send() method.

There are two benefits for this multi-step process:

  1. It keeps the network I/O request and response management and lifecycle in one place, making the code easier to reason about
  2. The request managers can implement deduplication and/or coalescing of requests


Top level designImage Added

Terminologies:

  • CB: Callbacks registered and invoked on the polling thread: commit callback, rebalance callback.
  • rm: RequestManagers. e.g. Heartbeat, FindCoordinatorRequest.
  • subscriptionState design is still under discussion

Application thread and its lifecycle

The polling thread handles API invocation and any responses from the background thread. Let's demonstrate its life cycle using the simple consumer use case (assign(), poll(), commitSync()) :

  1. The user invokes assign(), the subscriptionState is altered.
  2. The subscription state changes are sent to the background thread via the BackgroundEventQueue.
  3. The user then invokes poll() in a loop.
  4. During the poll, the polling thread sends a fetch request to the background thread.
  5. During the poll, the polling thread polls fetch results from the BackgroundEventQueue. It deserializes the poll results and returns the result to the user.
  6. The user processes the results and invokes commitSync().
  7. The client thread sends an OffsetCommitApplicationEvent to the background thread. As this is a blocking operation, the method returns when the background thread completes the commit.

Background thread and its lifecycle

The background runs a loop that periodically checks the ApplicationEventQueue, and drains and processes the events. On the high level, the lifecycle of the background thread can be summarized as such:

  1. The application starts up the Consumer, the Consumer creates an EventHandler, and starts up the background thread.
  2. The background thread enters the loop and starts polling the ApplicationEventQueue.
    1. Events will be sent to the corresponding RequestManager.  For example, a commit event is sent to the OffsetCommitRequestManager.
  3. The background thread polls each RequestManager. If the RequestManager returns a result, we enqueue it to the NetworkClientDelegate.
  4. Poll the NetworkClientDelegate to ensure the requests are sent.

Network Layers

Image Added

We are deprecating the current ConsumerNetworkClient because:

  1. The lockings are unnecessary in the new design because everything is on a single thread.
  2. Some irrelevant features are irrelevant to this design, such as unsent.

We are introducing a wrapper over NetworkClient, the NetworkClientDelegate, to help to coordinate the requests.

  • All requests are first enqueued into the unsentRequests queue
  • Polling the NetworkClient will result in sending the requests to the queue.

Request Manager

Kafka consumer tasks are tight to the broker requests and responses. In the new implementation, we took a more modular approach to create request managers for different tasks and have the background thread to poll these request managers to see if any requests need to be send. Once a request is returned by the poll, the background thread will enqueu it to the network client to be sent out.

The request managers handle the following requests

  1. FindCoordinatorRequest
  2. OffsetCommitRequest
  3. FetchRequest
  4. MetadataRequest
  5. HeartbeatRequest
  6. ListOffsetRequest

After KIP-848 is implemented, the request managers also handle the following:

  1. ConsumerGroupHeartbeatRequest
  2. ConsumerGroupPrepareAssignmentRequest
  3. ConsumerGroupInstallAssignmentRequest

RequestFuture and Callback

The current implementation chains callbacks to requestFutures (Kafka internal type).  We have decided to move away from the Kafka internal type and migrate to the Java CompletableFuture due to its better interface and features.

Events and EventHandler

EventHandler is the main interface between the polling thread and the background thread. It has two main purposes:

  1. Allows polling thread to send events to the background thread
  2. Allows polling thread to poll background thread events

Here we define two types of events:

  1. ApplicationEvent: application side events that will be sent to the background thread
  2. BackgroundEvent: background thread events that will be sent to the application

In the onJoinComplete:

...

Enter ASSIGNING_PARTITIONS, send a partition assignment event. Return.

...

Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED

...

We use a blocking queue to send API events from the polling thread to the background thread. We will abstract the communication operation using an EventHandler, which allows the caller, i.e. the polling thread

...

, to add and poll the events.

EventHandler
interface EventHandler {
public ApplicationEvent poll();
public void add(RequestEvent event);
}
ApplicationEventQueue and ApplicationEvent
// Channel used to send events to the background thread

private BlockingQueue<ServerEvent>BlockingQueue<ApplicationEvent> queue;

abstract public class ServerEventApplicationEvent {
   private final ServerEventTypeApplicationEventType eventType;
}

enum ServerEventTypeApplicationEventType {
   FETCHCOMMIT,
   COMMIT,
ACK_PARTITION_REVOKED,
METADATAACK_PARTITION_UPDATEASSIGNED,
CLOSEUPDATE_METADATA,
...LEAVE_GROUP, }

...

BackgroundEventQueue and BackgroundEvent
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ConsumerEvent>BlockingQueue<BackgroundEvent> queue;

abstract public class ConsumerEventBackgroundEvent {
   private final ConsumerEventTypeBackgroundEventType eventType;
}

enum ConsumerEventTypeBackgroundEventType {
   ERROR,
   REVOKE_PARTITIONPARTITIONS,
   ASSIGN_PARTITIONS,
...FETCH_RESPONSE, }

Wakeup() and WakeupException

TBD. I’m not sure how WakeupException plays out in the new design.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Are there better way to configure session interval and heartbeat interval?

Test Plan

We will first write and use the existing unit tests and integration tests.

We need to make sure the timing of the 1. coordinator discovery and 2. joinGroup operations are being done in the correct timing.

We will also ensure the heartbeat interval, poll interval are respected.

We also need to make sure there's no event lost, the event should happen in the correct sequence.

Consumer API Internal Changes 

Poll

The users are required to invoke poll to:

  1. Trigger auto-commit
  2. Poll exceptions: process or raise it to the user
  3. Poll fetches
  4. Poll callback invocation trigger to trigger the rebalance listeners.

CommitSync

  1. The polling thread send a commit event.  The commit event has a completable future.
  2. Wait for the completable future to finish, so that we can make this a blocking API

Assign

  1. If we are assigning nothing, trigger unsubscribe()
  2. clear the fetcher buffer
  3. send a commit event if autocommit is enabled
  4. send metadata update

Subscribe

  1. If subscribing to nothing, trigger unsubscribe()
  2. clear the fetcher buffer
  3. subscribes 
  4. send metadata update

Unsubscribe

  1. Send a leave group event
  2. unsubscribe from the topics

Major Changes

Fetcher

We will break the current fetcher into three parts to accommodate the asynchronous design, i.e., we need the background thread to send fetches autonomously and the polling thread to collect fetches when these fetches become available. We will have 3 separate classes here:

  1. FetchSender: Responsible for sending fetches in the background thread
  2. FetchHandler: Sitting in the polling thread's poll loop, processing the fetch response from the fetch event. 
  3. FetchBuffer: This is the CompletedFetches in the old implementation. The use case prevents the FetchSender from sending too many fetches and causing memory issues. This will be removed once we implement the memory-based buffer.(KIP-81)

Consumer Poll Changes

We will remove the metadata logic from the consumer.poll(), so that the execution of the poll loop is much simplified. It mainly handles:

  1. fetches
  2. callback invocation
  3. errors

ConsumerCoordinator and AbstractCoordinator

  • New states will be introduced (see Rebalance States section above).  The main purpose is to make the background thread drive the poll, and letting the polling thread to invoke the callbacks.
  • Remove HeartbeatThread. Therefore, we won't be starting the heartbeat thread.

    • It will still require a fetch event to poll heartbeat.  As only polling thread issues fetch events, and we want to respect the existing implementation.
  • Timeouts and timers will be reevaluated and possibly removed.
  • while loops will be reevaluated and possibly thrown out.  In the new implementation the coordinator will be non-blocking, and its states are managed by the background thread loop.

Timeout Policy

Please see Java client Consumer timeouts for more detail on timeoutsPlease review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.

Compatibility

The new consumer should be backward compatible.

Alternative

...

Proposals

Fetcher

  • Should we consider larger or configurable prefetch buffer?  Ideally we can tune this param to improve the performance

SubscriptionState

  • some of the fetcher configuration be dynamic

  • Configurable prefetch buffer

SubscriptionState

  • Shared: For the implementation simplicity, the subscriptionState will be shared between the background and polling thread

  • Non-shared: We can make a copy of the subscriptionState in the background thread, and 1. update the state and 2. check for consistency during the poll

2 Channel Implementation

  • We really only use the second queue (ConsumerChannel) for two purposes (see below), so instead we could just hold a reference of those and check them during the poll. (for simplicity)

    • Partition Revocation

    • Error

Others

  • use event to drive the synchronization.

    • There could be out of sync issues, which can subsequently causes in correct fetching, etc..

API Changes

  • Poll returns CompletableFuture<ConsumerRecord<K,V>>Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take().  Each take() performs similar action as poll().

User Adoption

The refactor should have (almost) no regression or breaking changes upon merge.   So user should be able to continue using the new client.