Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

// Channel used to send events to the background thread

private BlockingQueue<RequestEvent> queue;

abstract public class RequestEvent {
   private final RequestEventType eventType;
}

enum RequestEventType {
   FETCH,
   COMMIT,
   METADATA_UPDATE,
   CLOSE,
   ...
}

ResponseEventQueue
// Channel used to send events to the polling thread for client side execution/notification

private BlockingQueue<ResponseEvent> queue;

abstract public class ResponseEvent {
   private final ResponseEventType eventType;
}

enum ResponseEventType {
   ERROR,
   REVOKE_PARTITIONS,
   ASSIGN_PARTITIONS,
FETCH_RESPONSE, }

...

Major Changes

Consumer Poll Changes

consumer.poll() onward will mainly be responsible for a the following tasks:

  1. Poll EventHandler, process the events
  2. Send fetches to the background thread
  3. Return fetched data

Major Changes

Consumer Poll Changes

Currently, the consumer.poll() does two things: poll coordinator for assignment update, auto-commit and rebalances, and fetching.

Moving forward, the consumer will only be responsible for sending and returning the fetch data and polling the channel for events such as rebalancing and error:

  1. Check wakeup flag.  Throw WakeupException and interrupt the loop if a wakeup call was invoked.
  2. Poll the ConsumerQueue for events

    1. Handle exceptions

    2. Handle callback execution

  3. Send out fetch requests

  4. Returning fetch results

a loop polling for events from the event handler.  Here are the important events:

  1. Fetch Data: Return immediately
  2. Callback Invocation: Invoke the callback and subsequently send an acknowledgment event to the background thread to advance the state machine (see background thread section).
  3. Error: Handle the error upon receiving it.

ConsumerCoordinator and AbstractCoordinator

...

  1. Poll the EventHandler, and execute the events accordingly
  2. Prepare a FETCH event, and send it to the background thread.
  3. Wait for the CompleteableFuture to finish
  4. Execute the interceptor.onConsumer and return the data.
  5. Background thread execute the FETCH event:
    1. Autocommits // I think we can move the autocommit to the background thread
    2. issue fetch requests

...