Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Update Java Docs

  • API signatures will remain exactly the same

  • API returned exception exceptions can be a little bit differencedifferent. See  See wakeupException() section.

...

This document highlights our effort to refactor the threading model of the KafkaConsumer, and the purpose of this refactor is to address issues and shortcomings we’ve encountered in the past years, such as increasing code complexity from the hotfixes and patches, lock and race conditions caused by both heartbeatThread heartbeat thread and the polling thread making coordinator requests, and the coupling between the client poll and the rebalance progress.

  • Code complexity: the current rebalance protocol is quite heavy on the client side. And  And over the years, the patches have gotten increasingly complicated, and the readability has been impacted. For and hotfixes have impacted the readability of the code.  For example, coordinator communication can happen at different places, that which makes it difficult challenging to understand the code path. There  There are also lots of small patches to handle bugs and edge cases that might further down require more fixes.

  • Complex Coordinator Communication: As the poling polling thread and the heartbeat thread both talk to the coordinator, it increases the code complexity and caused causes concurrent issues.

  • Asynchronous Background Thread: One of our goal goals here is to make rebalance process to happen asynchronously to the poll. Firstly Firstly, it simplifies the polling thread design, as all of the coordinator communication will be moved to the background. Secondly Secondly, it prevents polls from consumer poll blocking other consumer operations such as network requests.

...

We will be moving network communication, including heartbeat to the background thread, which allows the consumer to operate in a more asynchronous fashionasynchronously. In  In particular, the scope of this refactor will be as such:

  1. Deprecate HeartbeatThread

  2. Refactor the Coordinator classes, i.e., AbstractCoordinator and ConsumerCoordinator, to adapt to the this asynchronous design.

  3. Refactor the KafkaConsumer API internals , to adopt to this asynchronous design.

  4. Introduce events and communication channels , to facilitate the communication between the polling thread and the background threadthreads.

  5. Address issues in these jiras tickets

Definitions

Polling Thread

The main user thread.  It is called polling thread because it client thread, also known as the polling thread, is where the thread user invokes the poll() operation.

Background Thread

The thread that is running active process in the background upon initialization of the KafkaConsumer.handles network communication, including coordinator communication and heartbeat.  This article will provide more implementation details. 

Heartbeat Thread

The background running thread in the current implementation (see KIP-62).

Event

The main primary communication medium for the polling thread is to interact with the background thread.

...

  1. Polling thread, which handles all of the API requests and cb execution.

  2. Background thread, which handles network communication such as heartbeat and coordinator requests , and rebalance flow

  3. Communication channels, what which are responsible for:

    1. sending events to the background thread

    2. sending events to the polling thread

A note on the subscriptionState, here, it will be the only exception in this design, that is sharing references between the polling thread and the background thread: Its reference will be shared by both the polling and background threads.

Important Components

Background thread and its lifecycle

In our implementation, it is important for the background thread to discover coordinator (when needed) automatically. ThereforeThe coordinator is automatically discovered (when requested) in our implementation.  Therefore, we use a state machine to represent the different states of the coordinator connection , and allows rejoin after and perform rediscovery in case of disconnection, as long as the background thread isn’t isn't terminated. Here Here, the background will be in 4 different states, down, initialized, discovering (coordinator discovery), and stableWe thread’s lifecycle can be represented this way.

Background thread and its lifecycle

  1. Upon initialization of the consumer, the coordinator is down

  2. Polling The polling thread starts the background thread, ; the background thread moves to the initialized state

  3. The background thread loop:

    1. Poll for the new events (for example, a commit event) from the queue, if any

    2. Check the background thread state (connected to a coordinator or not)

      1. If the event requires a coordinator. Moves  Moves the coordinator to the discovery (coordinator discovery) state.

      2. If not, execute the eventIf the .

      3. Check the coordinator connection.  If the FindCoordinator request hasn't been completed, stay in the discovery state.  If the request fails, transition to the initialized state.  Otherwise, the coordinator is found and transition to the stable state state is currently in discovery state, check coordinator connection. Either move to initialized if the discovery failed. Or stable if coordinator is discovered.

  4. Poll ConsumerCoordinator

  5. Poll networkClient

  6. Go to 3

  7. If close() is received:

    1. set close to true to that the loop and exit

    2. poll coordinator and network client

    3. Commit

Consumer Poll Changes

Currently, consumer.poll() does 2 two things: poll coordinator for assignment update, auto-commit , and possibly running the rebalance processrebalances, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data , and poll polling the channel for events such as rebalancing and error:

  1. Poll the ConsumerQueue for events

    1. Handle exceptions

    2. Handle callback execution

  2. Send out fetch requests

  3. Returning fetch results

...

  • Refactor the HeartbeatThread out of the existing implementation. Therefore Therefore, we won’t won't be starting the heartbeat thread, heartbeatThread but having the send heartbeat mechanism piggy back piggyback onto the background thread.

  • Right now, the Coordinator state consist of: coordinator states Unjoin, Prepare_Rebalancing, Complete_Rebalancing, and Stable. Because  Because of the complexity of the rebalance callback and background thread added to execution in the new implementation, we will:

    • Add new states for partition assignment and revocation:

      • Add “REVOKE_PARTITION” state, to handle onPartitionRevoke

      • Add “ASSIGN_PARTITION” state to handle onPartitionAssigned

  • These “loop "loop until done or timeout” timeout" mechanisms will piggy back piggyback on the background thread loop , and take the advantage of the state machine to advance the progress:

    • Background thread will automatically re-discover the coordinator if disconnectdisconnected, using the state machine

    • We are already doing networkclinet.poll() in the background thread loop

Rebalance States

The existing coordinator impl implementation already has a state machine in place. We  We will add 4 four states , to facilitate the callback execution, and modify the existing states.

  1. After onJoinPrepare, transition to the REVOKING_PARTITIONS state. It  It will do a few things:

    1. Send an event to the polling thread to execute the event

    2. Wait for the partition revoked event, and advance the state to PARTITIONS_REVOKED

  2. In the onJoinComplete:

    1. we will first existing exist COMPLETING_JOINstate

    2. Enter ASSIGNING_PARTITIONS, and send a partition assignment event. Return.

    3. Wait for partition assignment completion from the polling thread. Advance to PARTITIONS_ASSIGNED

...

TBD. I’m not sure how WakeupException plays out in the new design.

Timeout Policy

Consumer.poll() - user provide timeout

...

Coordinator discovery timeout: Currently uses the user-provided timeout in the consumer.poll(). Maybe we should just use request.timeout.ms. And re-attempt in the next loop if failed

CommitOffsetSync: user provided

Are Is there a better way to configure session interval and heartbeat interval?

...

We will also ensure the heartbeat interval, and poll interval are respected.

We also need to make sure there's no event lost, and the event should happen in the correct sequence.

...

  • Should poll(), internally, return a lazy list/stream, ConsumerRecordStream, so that the API would just execute stream.take().  Each take() performs similar action as poll().

User Adoption

The refactor should have (almost) no regression or breaking changes upon merge.  So user should be able to continue using the new client.