...
(record metadata) - - (confirm) - -> Consumer (register succeed, block reading for coordinator requests such as ping, stop-fetcher, start-fetcher)
Coordinator (record metadata) -- (confirm) --> Consumer (same as above)
...
- - (ping) --> Consumer (failed)
----------------------------------------------------------------------------------------------
...
-- (stop-fetcher) - -> Consumer (alive)
----------------------------------------------------------------------------------------------
...
-- (start-fetcher) - -> Consumer (become failed)
...
Also some utility functions are added to Utils and ZkUtils.
11.6 kafka.
...
consumer
- OffsetInfoSend: offset information format wrapper, match with for
...
- HeartbeatResponse and StopFetcherResponse.
...
- SimpleConsumerConnector:
...
CoordinatorRequestHandler: Phase I consumer's communication module with coordinator, which is based on ZkQueue. You do not need to review this class.
...
- The simplified consumer client to replace the original ZookeeperConsumerConnector,
...
- also inherited from the ConsumerConnector interface.
...
- It differs with the ZookeeperConsumerConnector as the following:
- Remove all functionalities that depends on Zookeeper, remove the ZkClient (hence its Fetcher/FetcherRunnable classes also removed ZkClient)
- Simplified WildcardStreamsHandler, now it onl used to remember the template type of the message, but no need to connect to Zookeeper to keep track of topic changes.
- Add a ConsumerCoordinatorConnector thread which is used to communicate with coordinator via RPC and indicates the consumer to stop/start fetchers.
- WildcardStreamsHandler: for consumers subscribing to wildcard topics, it will construct a WildcardStreamsHandler, which is used
1. ConsumerCoordinatorConnector: this is to replace the CoordinatorRequestHandler by communicating with coordinator via TCP.
...
- to re-initialize the mapping from chunk queues to streams on receiving StartFetcherRequest
...
- ConsumerCoordinatorConnector: it is used to
- Register to the coordinator and maintains the channel with the coordinator
...
- .
- Handles lost-connection/broken pipe event by re-reconnecting to the new coordinator
...
- .
- Receive and handle
...
- HearbeatRequest and /Start/
...
- StopFetcherRequest through its CoordinatorRequestHandler.
- In the future it needs to be able to pro-actively sends CommitOffsetRequest to the coordinator
...
-- In hanldPingRequest, it will also check if the fetcher threads has stopped unexpectedly (probably due to out-of-range error); if yes, it will close connection, stop fetcher and try to re-connect to coordinator again.
...
- in order to actively call commitoffset().
- CoordinatorRequestHandler: it contains the handler function to handle different types of requests from the coordinator by the ConsumerCoordinatorConnector. Its logic is described in Section .
11.7 kafka.server
- KafkaServer contains a new ConsumerCoordinator instance, which will be initialized upon server startup if ZookeeperEnabled is true.
...