Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Public-Facing Changes

  • Update Java Docs

  • API signatures will remain exactly the same

  • API returned exception can be a little bit difference. See wakeupException() section.

Summary

This document highlights our effort to refactor the existing KafkaConsumer, and the purpose of this refactor is to address issues and shortcoming we’ve observed in the past years, such as increasing code complexity from the hotfixes and patches, undesired concurrency behavior introduced by having both heartbeat thread and polling thread communicate to the coordinator, and the coupling between the client poll and the rebalance progress.

  • Code complexity: the current rebalance protocol is quite heavy on the client side. And over the years, the patches have gotten increasingly complicated, and the readability and clarity has been impacted. For example, coordinator communication can happen at different places, that makes it difficult to understand the code path.

  • Complex Coordinator Communication: To the previous point, as the poling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and has introduced concurrent issues in the past.

  • Asynchronous Background Thread: One of our goal here is to make rebalance process to happen asynchronously. And the advantages are, firstly, it simplifies the polling thread design, as all of the coordinator communication will be moved to the background. Secondly, it prevents polls from blocking the network requests.

Scope

The goal of this refactor, is to move network communication, including heartbeat to the background thread, which allows the consumer to operate in a more asynchronous fashion. In particular, the scope of this refactor will be as such:

  1. Deprecate HeartbeatThread

  2. Refactor the Coordinator classes, i.e. AbstractCoordinator and ConsumerCoordinator, to adapt to the this asynchronous design.

  3. Refactor the KafkaConsumer API internals, to adopt to this asynchronous design.

  4. Introduce events and communication channels, to facilitate the communication between the polling thread and the background thread.

  5. Address issues in these jiras tickets

Summary

We want to revisit the current design of the heartbeat thread , which decouples poll calls from sending heartbeat.  The main benefit of the current design is its ability to maintain the liveliness of the consumer without manual polling; however, as the network communication is being handle by both threads, we inevitably encountered numbers of concurrent issues such as locking and race.  As you may see, the list of major issues related the current threading design can be found here: jiras.

The main goal of this initiative is to create a more linear and modular design by separating the responsibilities of the polling thread and the background thread.  In particular,  we want to dedicate network communications such as rebalancing and heartbeat, to the background thread, and the polling thread to handle user API calls and callback executions.  That being said, the polling thread will work completely asynchronously to the background thread, and therefore, should eliminate issues such as locking.

The second goal is modernization.  There are many existing wrappers such as Vert.X clients, allowing users to perform asynchronous fetches.  We believe we should take the opportunity as the stepping stone, to enable us to incorporate the use of modern APIs such as CompletableFuture or Stream in the future design.

...

Definitions

Polling Thread

...

The main communication medium for the polling thread to interact with the background thread.

Proposed Design

Here we highlight the focus of this refactor:

  1. Heartbeat, rebalance process, coordinator discovery, fetching will be executed by the background thread.

  2. Client API, rebalance callbacks will be executed on the polling thread.

  3. Polling thread can send events to a channel, and the background thread will consume events from that channel.

  4. Event will contains a CompletableFuture object, which is used to communicate the event results (e.g. fetched records) back to the polling thread.

  5. Background thread events, such as partition revocation, errors, will be communicate through another channel.  We will rely on consumer executing poll(), to listen to that channel. 

Top Level Design

Top level schematics

...

In the schematic below, the main components are:

  1. Polling thread, which handles all of the API requests and cb execution.

  2. Background thread, which handles network communication such as heartbeat and coordinator requests, and rebalance flow

  3. Communication channels, what are responsible for

    1. sending events to the background thread

    2. sending events to the polling thread

A note on the subscriptionState, here, it will be the only exception in this design, that is sharing references between the polling thread and the background thread.

Important Components

Background thread and its lifecycle

In our implementation, it is important for the background thread to discover coordinator (when needed) automatically. Therefore, we use a state machine to represent the different states of the coordinator connection, and allows rejoin after disconnection, as long as the background thread isn’t terminated. Here, the background will be in 4 different states, down, initialized, discovering (coordinator discovery), and stable

We thread’s lifecycle can be represented this way

  1. Upon initialization of the consumer, the coordinator is down

  2. Polling thread starts the background thread, background thread moves to initialized state

  3. The background thread loop:

    1. Poll for the new events (for example, a commit event) from the queue if any

Background thread state machine.

...

Important Components

Background thread loop

Upon the initialization, the background

thread will enters a loop and performs a few tasks:

  1. Initialization

  2. Poll for the new events from the queue
    1. Check the background thread state (connected to a coordinator or not)Execute

      1. If the event

      if possible (for example, commit will need a coordinator and assign doesn’t)
  3. Autocommits

  4. Poll ConsumerCoordinator

    1. Metadata update

    2. Join group

    3. Heartbeat

    4. Auto-commit

    5. Partition changes (revoke/assign)

      1. Send events to the channel

      1. requires coordinator. Moves the coordinator to the discovery (coordinator discovery) state.

      2. If not, execute the event

      3. If the state is currently in discovery state, check coordinator connection. Either move to initialized if the discovery failed. Or stable if coordinator is discovered.

  5. Poll ConsumerCoordinator

  6. Poll networkClient

  7. Go to 3

  8. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

    Poll networkClient

Consumer Poll Changes

Currently, consumer.poll does 2 things: poll coordinator for assignment update, auto commit, and possibly running the rebalance process, and fetching.

...

ConsumerCoordinator and AbstractCoordinator

We will take advantage of use the existing implementation but with a few changes

  • Refactor the HeartbeatThread out of the existing implementation. Therefore, we won’t be starting the heartbeat thread, but having the send heartbeat mechanism piggy back onto the background thread.

  • Right now, the Coordinator state consist of: Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because of the complexity of the rebalance callback and background thread added to the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add “REVOKE_PARTITION” state, to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle onPartitionAssigned

  • joinGroupIfNeeded will need to be refactored

    • (See above) Finer stages of rebalance process, each should be represented by a state, in order for finer control of the rebalance process

    • For example: onJoinPrepare becomes onJoinPrepare, PartitionRevocation, onJoinPrepareCompleted

    • Because of the async nature of the new implementation, polling loops like rejoinNeededOrPending will be removed. These “loop until done or timeout” mechanisms will piggy back on the background thread loop, and use state machine to advance the progress:

      • Background thread will automatically re-discover coordinator if disconnect, using the state machine

      • We are already doing networkclinet.poll() in the background thread loop

SubscriptionState Ownership

...

Rebalance States

The existing coordinator impl already has a state machine in place. We will add 4 states, to facilitate the callback execution, and modify the existing states.

  1. After onJoinPrepare, transition to REVOKING_PARTITIONS state. It will do a few things:

    1. Send an event to the polling thread to execute the event

    2. Wait for partition revoked event, and advance the state to PARTITIONS_REVOKED

  2. In the onJoinComplete:

    1. we will first existing COMPLETING_JOINstate

    2. Enter ASSIGNING_PARTITIONS, send a partition assignment event. Return.

    3. Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED

Channel and Events

We use a blocking queue to send API events from the polling thread to the background thread. 

...

The new consumer should be backward compatible.

Alternative Proposal

Fetcher

  • Should we consider larger or configurable prefetch buffer?  Ideally we can tune this param to improve the performance

...