You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Summary

We want to revisit the current design of the heartbeat thread , which decouples poll calls from sending heartbeat.  The main benefit of the current design is its ability to maintain the liveliness of the consumer without manual polling; however, as the network communication is being handle by both threads, we inevitably encountered numbers of concurrent issues such as locking and race.  As you may see, the list of major issues related the current threading design can be found here: jiras.

The main goal of this initiative is to create a more linear and modular design by separating the responsibilities of the polling thread and the background thread.  In particular,  we want to dedicate network communications such as rebalancing and heartbeat, to the background thread, and the polling thread to handle user API calls and callback executions.  That being said, the polling thread will work completely asynchronously to the background thread, and therefore, should eliminate issues such as locking.

The second goal is modernization.  There are many existing wrappers such as Vert.X clients, allowing users to perform asynchronous fetches.  We believe we should take the opportunity as the stepping stone, to enable us to incorporate the use of modern APIs such as CompletableFuture or Stream in the future design.

However, we would like to clarify that we will keep the API changes minimal, to keep the new client backward compatible.

Definitions

Polling Thread

The main user thread.  It is called polling thread because it is where the thread user invokes the poll() operation.

Background Thread

The thread that is running in the background upon initialization of the KafkaConsumer.

Heartbeat Thread

The background thread in the current implementation (see KIP-62).

Event

The main communication medium for the polling thread to interact with the background thread.

Proposed Design

Here we highlight the focus of this refactor:

  1. Heartbeat, rebalance process, coordinator discovery, fetching will be executed by the background thread.

  2. Client API, rebalance callbacks will be executed on the polling thread.

  3. Polling thread can send events to a channel, and the background thread will consume events from that channel.

  4. Event will contains a CompletableFuture object, which is used to communicate the event results (e.g. fetched records) back to the polling thread.

  5. Background thread events, such as partition revocation, errors, will be communicate through another channel.  We will rely on consumer executing poll(), to listen to that channel. 

Top Level Design

Top level schematics


Background thread state machine.

Important Components

Background thread loop

Upon the initialization, the background

thread will enters a loop and performs a few tasks:

  1. Initialization

  2. Poll for the new events from the queue

  3. Check the background thread state (connected to a coordinator or not)

    1. Execute the event if possible (for example, commit will need a coordinator and assign doesn’t)

  4. Autocommits

  5. Poll ConsumerCoordinator

    1. Metadata update

    2. Join group

    3. Heartbeat

    4. Auto-commit

    5. Partition changes (revoke/assign)

      1. Send events to the channel

  6. Poll networkClient

Consumer Poll Changes

Currently, consumer.poll does 2 things: poll coordinator for assignment update, auto commit, and possibly running the rebalance process, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data, and poll the channel for events such as rebalancing and error:

  1. Poll the ConsumerQueue for events

    1. Handle exceptions

    2. Handle callback execution

  2. Send out fetch requests

  3. Returning fetch results

ConsumerCoordinator and AbstractCoordinator

We will take advantage of the existing implementation but with a few changes

  • Refactor the HeartbeatThread out of the existing implementation. Therefore, we won’t be starting the heartbeat thread, but having the send heartbeat mechanism piggy back onto the background thread.

  • Right now, the Coordinator state consist of: Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because of the complexity of the rebalance callback and background thread added to the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add “REVOKE_PARTITION” state, to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle onPartitionAssigned

  • joinGroupIfNeeded will need to be refactored

    • (See above) Finer stages of rebalance process, each should be represented by a state, in order for finer control of the rebalance process

    • For example: onJoinPrepare becomes onJoinPrepare, PartitionRevocation, onJoinPrepareCompleted

    • Because of the async nature of the new implementation, polling loops like rejoinNeededOrPending will be removed. These “loop until done or timeout” mechanisms will piggy back on the background thread loop, and use state machine to advance the progress:

      • Background thread will automatically re-discover coordinator if disconnect, using the state machine

      • We are already doing networkclinet.poll() in the background thread loop

SubscriptionState Ownership

For the preliminary design, we make the subscriptionState shared between the polling and background thread to honor the current API guarantee.  (We should not wait for the subscription to be updated in both poll and background thread).

Channel and Events

We use a blocking queue to send API events from the polling thread to the background thread. 

KafkaServerEventQueue
// Channel used to send events to the background thread

private BlockingQueue<ServerEvent> queue;

abstract public class ServerEvent {
   private final ServerEventType eventType;
}

enum ServerEventType {
   FETCH,
   COMMIT,
   METADATA_UPDATE,
   CLOSE,
   ...
}

KafkaServerEventQueue
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ConsumerEvent> queue;

abstract public class ConsumerEvent {
   private final ConsumerEventType eventType;
}

enum ConsumerEventType {
   ERROR,
   REVOKE_PARTITION,
   ...
}

Wakeup() and WakeupException

TBD. I’m not sure how WakeupException plays out in the new design.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Are there better way to configure session interval and heartbeat interval?

Test Plan

We will first write and use the existing unit tests and integration tests.

We need to make sure the timing of the 1. coordinator discovery and 2. joinGroup operations are being done in the correct timing.

We will also ensure the heartbeat interval, poll interval are respected.

We also need to make sure there's no event lost, the event should happen in the correct sequence.

Please review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.

Compatibility

The new consumer should be backward compatible.

Alternative Proposal

Fetcher

  • Should we consider larger or configurable prefetch buffer?  Ideally we can tune this param to improve the performance

SubscriptionState

  • Shared: For the implementation simplicity, the subscriptionState will be shared between the background and polling thread

  • Non-shared: We can make a copy of the subscriptionState in the background thread, and 1. update the state and 2. check for consistency during the poll

2 Channel Implementation

  • We really only use the second queue (ConsumerChannel) for two purposes (see below), so instead we could just hold a reference of those and check them during the poll. (for simplicity)

    • Partition Revocation

    • Error

Others

  • Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take().  Each take() performs similar action as poll().

User Adoption

The refactor should have (almost) no regression or breaking changes upon merge.  So user should be able to continue using the new client.

  • No labels