Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

                             (record metadata)       - - (confirm) - -> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)

Coordinator       (record metadata)       -- (confirm) --> Consumer (same as above)

...

                                     - - (ping) -->     Consumer (failed)

----------------------------------------------------------------------------------------------

...

                                     -- (stop-fetcher) - ->     Consumer (alive)

----------------------------------------------------------------------------------------------

...

                                                                                       -- (start-fetcher) - ->     Consumer (become failed)

...

Also some utility functions are added to Utils and ZkUtils.

11.6 kafka.

...

consumer

  • OffsetInfoSend: offset information format wrapper, match with for

...

  • HeartbeatResponse and StopFetcherResponse.

...

  • SimpleConsumerConnector:

...

CoordinatorRequestHandler: Phase I consumer's communication module with coordinator, which is based on ZkQueue. You do not need to review this class.

...

  • The simplified consumer client to replace the original ZookeeperConsumerConnector,

...

  • also inherited from the ConsumerConnector interface.

...

  • It differs with the ZookeeperConsumerConnector as the following:
    • Remove all functionalities that depends on Zookeeper, remove the ZkClient (hence its Fetcher/FetcherRunnable classes also removed ZkClient)
    • Simplified WildcardStreamsHandler, now it onl used to remember the template type of the message, but no need to connect to Zookeeper to keep track of topic changes.
    • Add a ConsumerCoordinatorConnector thread which is used to communicate with coordinator via RPC and indicates the consumer to stop/start fetchers.
  • WildcardStreamsHandler: for consumers subscribing to wildcard topics, it will construct a WildcardStreamsHandler, which is used

1. ConsumerCoordinatorConnector: this is to replace the CoordinatorRequestHandler by communicating with coordinator via TCP.

...

  • to re-initialize the mapping from chunk queues to streams on receiving StartFetcherRequest

...

  • ConsumerCoordinatorConnector: it is used to
    • Register to the coordinator and maintains the channel with the coordinator

...

    • .
    • Handles lost-connection/broken pipe event by re-reconnecting to the new coordinator

...

    • .
    • Receive and handle

...

    • HearbeatRequest and /Start/

...

    • StopFetcherRequest through its CoordinatorRequestHandler.
    • In the future it needs to be able to pro-actively sends CommitOffsetRequest to the coordinator

...

-- In hanldPingRequest, it will also check if the fetcher threads has stopped unexpectedly (probably due to out-of-range error); if yes, it will close connection, stop fetcher and try to re-connect to coordinator again.

...

    • in order to actively call commitoffset().
  • CoordinatorRequestHandler: it contains the handler function to handle different types of requests from the coordinator by the ConsumerCoordinatorConnector. Its logic is described in Section .

11.7 kafka.server

  • KafkaServer contains a new ConsumerCoordinator instance, which will be initialized upon server startup if ZookeeperEnabled is true.

...