Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Summary

...

Thread interleaving hinders lots of race and locking issues that we are observing today, as a result of the coupling behaviors between the polling thread and the heartbeat thread.  Here we propose a redesign of the consumer threading model, with the goal of simplifying the logic of how different tasks and events are handled.

We want to revisit the current design of the heartbeat thread , which decouples poll calls from sending heartbeat.  The main benefit of the current design is its ability to maintain the liveliness of the consumer without manual polling; however, as the network communication is being handle by both threads, we inevitably encountered numbers of concurrent issues such as locking and race.  As you may see, the list of major issues related the current threading design can be found here: jiras.

The main goal of this initiative is to create a more linear and modular design by separating In the new design, we will decouple the responsibilities of the polling thread and the background thread.  In particular, we propose making the polling thread responsible for tasks such as API calls, data serialization and deserialization, and making the background thread responsible for network IO, consumer group management and heartbeat  we want to dedicate network communications such as rebalancing and heartbeat, to the background thread, and the polling thread to handle user API calls and callback executions.  That being said, the polling thread will work completely asynchronously to the background thread, and therefore, should eliminate issues such as locking.

The second goal is modernization.  There are many existing wrappers such as Vert.X clients, allowing users to perform asynchronous fetches.  We believe we should take the opportunity as the stepping stone, to enable us to incorporate the use of modern APIs such as CompletableFuture or Stream in the future design.

However, we would like to clarify that we will keep the API changes minimal, to keep the new client backward compatible.

Definitions

Polling Thread

The main user thread.  It is called polling thread because it is where the thread user invokes the poll() operation.

Background Thread

The thread that is running in the background upon initialization of the KafkaConsumer.

Heartbeat Thread

The background thread in the current implementation (see KIP-62).

KafkaServerEvent

...

Event

The main communication medium for the polling thread to interact with the background thread.

Proposed Design

Here we highlight the focus of this refactor:

  1. Heartbeat, rebalance process, coordinator discovery, fetching will be executed by the background thread

...

  1. .

  2. Client API, rebalance callbacks will be executed on the polling thread.

  3. Polling thread can send events to a channel, and the background thread will consume events from that channel.

  4. Event will contains a CompletableFuture object, which is used to communicate the event results (e.g. fetched records) back to the polling thread.

  5. Background thread events, such as partition revocation, errors, will be communicate through another channel.  We will rely on consumer executing poll(), to listen to that channel. 

Top Level Design

Top level schematics

Image Added


Background thread state machine.

Image Added

Important Components

Background thread loop

Upon the initialization, the background

thread will enters a loop and performs a few tasks:

  1. Initialization

  2. Poll for the new events from the queue

  3. Check the background thread state (connected to a coordinator or not)

    1. Execute the event if possible (for example, commit will need a coordinator and assign doesn’t)

  4. Autocommits

  5. Poll ConsumerCoordinator

    1. Metadata update

    2. Join group

    3. Heartbeat

    4. Auto-commit

    5. Partition changes (revoke/assign)

      1. Send events to the channel

  6. Poll networkClient

Consumer Poll Changes

Currently, consumer.poll does 2 things: poll coordinator for assignment update, auto commit, and possibly running the rebalance process, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data, and poll the channel for events such as rebalancing and error:

  1. Poll the ConsumerQueuefor events

    1. Handle exceptions

    2. Handle callback execution

  2. Send out fetch requests

  3. Returning fetch results

ConsumerCoordinator and AbstractCoordinator

We will take advantage of the existing implementation but with a few changes

  • Refactor the HeartbeatThread out of the existing implementation. Therefore, we won’t be starting the heartbeat thread, but having the send heartbeat mechanism piggy back onto the background thread.

  • Right now, the Coordinator state consist of: Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because of the complexity of the rebalance callback and background thread added to the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add “REVOKE_PARTITION” state, to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle onPartitionAssigned

  • joinGroupIfNeeded will need to be refactored

    • (See above) Finer stages of rebalance process, each should be represented by a state, in order for finer control of the rebalance process

    • For example: onJoinPrepare becomes onJoinPrepare, PartitionRevocation, onJoinPrepareCompleted

    • Because of the async nature of the new implementation, polling loops like rejoinNeededOrPending will be removed. These “loop until done or timeout” mechanisms will piggy back on the background thread loop, and use state machine to advance the progress:

      • Background thread will automatically re-discover coordinator if disconnect, using the state machine

      • We are already doing networkclinet.poll() in the background thread loop

SubscriptionState Ownership

For the preliminary design, we make the subscriptionState shared between the polling and background thread to honor the current API guarantee.  (We should not wait for the subscription to be updated in both poll and background thread).

Channel and Events

We use a blocking queue to send API events from the polling thread to the background thread. 

KafkaServerEventQueue
// Channel used to send events to the background thread

private BlockingQueue<ServerEvent> queue;

abstract public class ServerEvent {
   private final ServerEventType eventType;
}

enum ServerEventType {
   FETCH,
   COMMIT,
   METADATA_UPDATE,
   CLOSE,
   ...
}

KafkaServerEventQueue
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ConsumerEvent> queue;

abstract public class ConsumerEvent {
   private final ConsumerEventType eventType;
}

enum ConsumerEventType {
   ERROR,
   REVOKE_PARTITION,
   ...
}

Wakeup() and WakeupException

TBD. I’m not sure how WakeupException plays out in the new design.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Are there better way to configure session interval and heartbeat interval?

Test Plan

We will first write and use the existing unit tests and integration tests.

We need to make sure the timing of the 1. coordinator discovery and 2. joinGroup operations are being done in the correct timing.

We will also ensure the heartbeat interval, poll interval are respected.

We also need to make sure there's no event lost, the event should happen in the correct sequence.

Please review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.

Compatibility

The new consumer should be backward compatible.

...

KafkaConsumerEvent

Events consumed executed on the consumer.  KafkaConsumerEventQueue indicates the channel that transports consumer events. Example: notifyRebalance

Proposed Design

A few changes to highlight here:

  1. Polling thread and background thread own a distinctive set of objects, which means, they will be responsible for different set of tasks
  2. Polling thread and Background thread communicate via channelsin form of events.
  3. Group membership and coordinator state will be maintained by the background thread
  4. Group membership is managed by a state machine
  5. The public API remains unchanged
  6. We will respect the current contract, in particular
    1. rebalance callback will still be executed by the polling thread

Image Removed