You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Public-Facing Changes

  • Update Java Docs

  • API signatures will remain exactly the same

  • API returned exception can be a little bit difference. See wakeupException() section.

Summary

This document highlights our effort to refactor the existing KafkaConsumer, and the purpose of this refactor is to address issues and shortcoming we’ve observed in the past years, such as increasing code complexity from the hotfixes and patches, undesired concurrency behavior introduced by having both heartbeat thread and polling thread communicate to the coordinator, and the coupling between the client poll and the rebalance progress.

  • Code complexity: the current rebalance protocol is quite heavy on the client side. And over the years, the patches have gotten increasingly complicated, and the readability and clarity has been impacted. For example, coordinator communication can happen at different places, that makes it difficult to understand the code path.

  • Complex Coordinator Communication: To the previous point, as the poling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and has introduced concurrent issues in the past.

  • Asynchronous Background Thread: One of our goal here is to make rebalance process to happen asynchronously. And the advantages are, firstly, it simplifies the polling thread design, as all of the coordinator communication will be moved to the background. Secondly, it prevents polls from blocking the network requests.

Scope

The goal of this refactor, is to move network communication, including heartbeat to the background thread, which allows the consumer to operate in a more asynchronous fashion. In particular, the scope of this refactor will be as such:

  1. Deprecate HeartbeatThread

  2. Refactor the Coordinator classes, i.e. AbstractCoordinator and ConsumerCoordinator, to adapt to the this asynchronous design.

  3. Refactor the KafkaConsumer API internals, to adopt to this asynchronous design.

  4. Introduce events and communication channels, to facilitate the communication between the polling thread and the background thread.

  5. Address issues in these jiras tickets

Definitions

Polling Thread

The main user thread.  It is called polling thread because it is where the thread user invokes the poll() operation.

Background Thread

The thread that is running in the background upon initialization of the KafkaConsumer.

Heartbeat Thread

The background thread in the current implementation (see KIP-62).

Event

The main communication medium for the polling thread to interact with the background thread.

Proposed Design

In the schematic below, the main components are:

  1. Polling thread, which handles all of the API requests and cb execution.

  2. Background thread, which handles network communication such as heartbeat and coordinator requests, and rebalance flow

  3. Communication channels, what are responsible for

    1. sending events to the background thread

    2. sending events to the polling thread

A note on the subscriptionState, here, it will be the only exception in this design, that is sharing references between the polling thread and the background thread.

Important Components

Background thread and its lifecycle

In our implementation, it is important for the background thread to discover coordinator (when needed) automatically. Therefore, we use a state machine to represent the different states of the coordinator connection, and allows rejoin after disconnection, as long as the background thread isn’t terminated. Here, the background will be in 4 different states, down, initialized, discovering (coordinator discovery), and stable

We thread’s lifecycle can be represented this way

  1. Upon initialization of the consumer, the coordinator is down

  2. Polling thread starts the background thread, background thread moves to initialized state

  3. The background thread loop:

    1. Poll for the new events (for example, a commit event) from the queue if any

    2. Check the background thread state (connected to a coordinator or not)

      1. If the event requires coordinator. Moves the coordinator to the discovery (coordinator discovery) state.

      2. If not, execute the event

      3. If the state is currently in discovery state, check coordinator connection. Either move to initialized if the discovery failed. Or stable if coordinator is discovered.

  4. Poll ConsumerCoordinator

  5. Poll networkClient

  6. Go to 3

  7. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

Consumer Poll Changes

Currently, consumer.poll does 2 things: poll coordinator for assignment update, auto commit, and possibly running the rebalance process, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data, and poll the channel for events such as rebalancing and error:

  1. Poll the ConsumerQueue for events

    1. Handle exceptions

    2. Handle callback execution

  2. Send out fetch requests

  3. Returning fetch results

ConsumerCoordinator and AbstractCoordinator

We will use the existing implementation but with a few changes

  • Refactor the HeartbeatThread out of the existing implementation. Therefore, we won’t be starting the heartbeat thread, but having the send heartbeat mechanism piggy back onto the background thread.

  • Right now, the Coordinator state consist of: Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because of the complexity of the rebalance callback and background thread added to the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add “REVOKE_PARTITION” state, to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle onPartitionAssigned

  • These “loop until done or timeout” mechanisms will piggy back on the background thread loop, and take the advantage of the state machine to advance the progress:

    • Background thread will automatically re-discover coordinator if disconnect, using the state machine

    • We are already doing networkclinet.poll() in the background thread loop

Rebalance States

The existing coordinator impl already has a state machine in place. We will add 4 states, to facilitate the callback execution, and modify the existing states.

  1. After onJoinPrepare, transition to REVOKING_PARTITIONS state. It will do a few things:

    1. Send an event to the polling thread to execute the event

    2. Wait for partition revoked event, and advance the state to PARTITIONS_REVOKED

  2. In the onJoinComplete:

    1. we will first existing COMPLETING_JOIN state

    2. Enter ASSIGNING_PARTITIONS, send a partition assignment event. Return.

    3. Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED

Channel and Events

We use a blocking queue to send API events from the polling thread to the background thread. 

KafkaServerEventQueue
// Channel used to send events to the background thread

private BlockingQueue<ServerEvent> queue;

abstract public class ServerEvent {
   private final ServerEventType eventType;
}

enum ServerEventType {
   FETCH,
   COMMIT,
   METADATA_UPDATE,
   CLOSE,
   ...
}

KafkaServerEventQueue
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ConsumerEvent> queue;

abstract public class ConsumerEvent {
   private final ConsumerEventType eventType;
}

enum ConsumerEventType {
   ERROR,
   REVOKE_PARTITION,
   ...
}

Wakeup() and WakeupException

TBD. I’m not sure how WakeupException plays out in the new design.

Timeout Policy

Consumer.poll() - user provide timeout

Coordinator rediscovery backoff: retry.backoff.ms

Coordinator discovery timeout: Currently uses the user provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Are there better way to configure session interval and heartbeat interval?

Test Plan

We will first write and use the existing unit tests and integration tests.

We need to make sure the timing of the 1. coordinator discovery and 2. joinGroup operations are being done in the correct timing.

We will also ensure the heartbeat interval, poll interval are respected.

We also need to make sure there's no event lost, the event should happen in the correct sequence.

Please review https://en.wikipedia.org/wiki/Failure_mode_and_effects_analysis and try to assess potential failures.

Compatibility

The new consumer should be backward compatible.

Alternative Proposal

Fetcher

  • Should we consider larger or configurable prefetch buffer?  Ideally we can tune this param to improve the performance

SubscriptionState

  • Shared: For the implementation simplicity, the subscriptionState will be shared between the background and polling thread

  • Non-shared: We can make a copy of the subscriptionState in the background thread, and 1. update the state and 2. check for consistency during the poll

2 Channel Implementation

  • We really only use the second queue (ConsumerChannel) for two purposes (see below), so instead we could just hold a reference of those and check them during the poll. (for simplicity)

    • Partition Revocation

    • Error

Others

  • Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take().  Each take() performs similar action as poll().

User Adoption

The refactor should have (almost) no regression or breaking changes upon merge.  So user should be able to continue using the new client.

  • No labels