Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Public-Facing Changes

  • Update Java Docs

  • API signatures will remain exactly the same

  • API returned exceptions can be a little bit different.  See   See wakeupException() section.

...

  • Code complexity: the current rebalance protocol is quite heavy on the client side.  And   And over the years, patches and hotfixes have impacted the readability of the code.  For   For example, coordinator communication can happen at different places, which makes it challenging to understand the code path.  There   There are also lots of small patches to handle bugs and edge cases that might require more fixes.

  • Complex Coordinator Communication: As the polling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and causes concurrent issues.

  • Asynchronous Background Thread: One of our goals here is to make rebalance process happen asynchronously to the poll.  Firstly  Firstly, it simplifies the polling thread design, as all the coordinator communication will be moved to the background.  Secondly  Secondly, it prevents consumer poll blocking other consumer operations such as network requests.

...

We will be moving network communication, including heartbeat, to the background thread, which allows the consumer to operate more asynchronously.  In   In particular, the scope of this will be as such:

  1. Deprecate HeartbeatThread

  2. Refactor the Coordinator classes, i.e., AbstractCoordinator and ConsumerCoordinator, to adapt to this asynchronous design.

  3. Refactor the KafkaConsumer API internals to adopt this asynchronous design.

  4. Introduce events and communication channels to facilitate communication between the polling and background threads.

  5. Address issues in these jiras Jira tickets

Definitions

Polling Thread

...

The active process in the background handles network communication, including coordinator communication and heartbeat.  This   This article will provide more implementation details. 

...

A note on the subscriptionState: Its reference will be shared by both the polling and background threads.

...

The coordinator is automatically discovered (when requested) in our implementation.  Therefore  Therefore, we use a state machine to represent the different states of the coordinator connection and perform rediscovery in case of disconnection, as long as the background thread isn't isn’t terminated.  Here  Here, the background will can be in 4 different states, one of the four states: down, initialized, discovering (coordinator discovery), and stable.

...

  1. Upon initialization of the consumer, the coordinator is down

  2. The polling thread starts the background thread; the background thread moves to the initialized state

  3. The background thread loop:

    1. Poll for the new events (for example, a commit event) from the queue, if any

    2. Check the background thread state (connected to a coordinator or not)

      1. If the event requires a coordinator.  Moves   Moves the coordinator to the discovery (coordinator discovery) state.

      2. If not, execute the event.

      3. Check the coordinator connection.  If   If the FindCoordinator request hasn't hasn’t been completed, stay in the discovery state.  If   If the request fails, transition to the initialized state.  Otherwise  Otherwise, the coordinator is found and transition to the stable state.

  4. Poll ConsumerCoordinator

  5. Poll networkClient

  6. Go to 3

  7. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

Consumer Poll Changes

Currently, the consumer.poll() does two things: poll coordinator for assignment update, auto-commit and rebalances, and fetching.

...

  • Refactor the HeartbeatThread out of the existing implementation.  Therefore  Therefore, we won't won’t be starting the heartbeatThread heartbeat thread but having the send heartbeat mechanism piggyback onto the background thread.

  • Right now, the coordinator states Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable.  Because   Because of the complexity of the rebalance callback execution in the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add the REVOKE_PARTITION” state , to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle take onPartitionAssigned

  • These "loop “loop until done or timeout" timeout” mechanisms will piggyback on the background thread loop and take advantage of the state machine to advance the progress:

    • Background thread will automatically re-discover the coordinator if disconnected, using the state machine

    • We are already doing networkclinet.poll() in the background thread loop

...

The existing coordinator implementation already has a state machine in place.  We   We will add four states to facilitate the callback execution.

  1. After onJoinPrepare, transition to the REVOKING_PARTITIONS state.  It   It will do a few things:

    1. Send an event to the polling thread to execute the event

    2. Wait for the partition revoked event, and advance the state to PARTITIONS_REVOKED

  2. In the onJoinComplete:

    1. we We will first exist COMPLETING_JOINstate

    2. Enter ASSIGNING_PARTITIONS, and send a partition assignment event.  Return.

    3. Wait for partition assignment completion from the polling thread.  Advance to PARTITIONS_ASSIGNED

...

Wakeup() and WakeupException

TBD.  I’m not sure how WakeupException plays out in the new design.

...

Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll().  Maybe we should just use request.timeout.ms.  And re-attempt in the next loop if failed

...

We need to make sure the timing of the 1.  coordinator discovery and 2.  joinGroup operations are being done in the correct timing.

We will also ensure the heartbeat interval, and poll interval are respected.

We also need to make sure there's ensure there’s no event lost, and the event should happen in the correct sequence.

...

Fetcher

  • Should we consider a larger or configurable prefetch buffer?  Ideally, we can tune this param to improve the performance.

SubscriptionState

  • Shared: For the implementation simplicity, the subscriptionState will be shared between the background and polling thread

  • Non-shared: We can make a copy of the subscriptionState in the background thread, and 1.  update the state, and 2.  check for consistency during the poll

2 Channel Implementation

  • We really only use the second queue (ConsumerChannel) for two purposes (see below), so instead we could just hold a reference of those and check them during the poll. (for simplicity)

  • Partition Revocation

  • Error

Others

  • Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take().  Each take() performs similar action as poll().

...