Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
Connector.onStartup:

Input: the socket channel to the current coordinator

while (isRunning):

  1. Try block reading from the channel.

  2. Handle the read request from the coordinator.

  3. If no request is received within the session timeout or if the connection is closed or failed, trigger the lost-connection procedure of the consumer.


Connector.handlePingRequest:

1. Send the current consuming partitions' consumed offset as the response back to the coordinator.


Connector.handleStopFetcherRequest:

1. Stop the fetchers of the consumer and clean the corresponding queues.

2. Send the consumed partitions' consumed offset as the response back to the coordinator.


Connector.handleStartFetcherRequest:

1. Read the assigned partition info from the request, and refresh the consumed partition info.

2. Start fetchers with the new consumed partition info.

3. Send the response back to the coordinator.