Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Today there is no separate between controller requests and regular data plane requests. Specifically (1) a controller in a cluster uses the same advertised endpoints to connect to brokers as what clients and regular brokers use for exchanging data use (2) on the broker side, the same network (processor) thread could be multiplexed by handling a controller connection and many other data plane connections (3) after a controller request is read from the socket by a network thread, it is enqueued into the single FIFO requestQueue, which is used for all types of requests (4) request handler threads poll requests from the requestQueue and handles the controller requests with the same priority as regular data requests.

...

  1. The requestQueue is full, and therefore blocks a network (processor) thread that has a controller request fully load from the socket and ready request ready to be enqueued.
  2. A controller request is enqueued into the requestQueue after a backlog of data requests, and experiences a long queuing time in the requestQueue.

Delaying a controller request for a prolonged period can have serious consequences, and we'll examine the impact of the delayed processing for the a LeaderAndISR requests request and the a UpdateMetadata requests request now[1].

  1. Delayed processing of a LeaderAndISR request
    1. LeaderAndISR with partitions to be transitioned to followersConsider the case that a LeaderAndISR request is enqueued after a backlog of Produce requests; the LeaderAndISR request contains a partition that needs to be transitioned from a leader to a follower, say partition0; the Produce requests ahead of it all have records for partition0, and let's assume each of the produce requests has a total of 20 partitions, partition0, partition1,..., partition19. Further we assume that the previous followers fetching from this broker for partition0 have stopped fetching.
      1. If the produce requests have required acks = -1 (all), they would be parked in the purgatory after their records are appended to the local log, waiting for followers to meet the required offsets for all of the 20 partitions. Unfortunately because the previous followers have stopped fetching for partition0, the required offset for partition0 can never be satisfied. Even after the remaining 19 partitions satisfy their required offsets, the produce requests will still be pending in the purgatory until the LeaderAndISR request is processed to mark partition0 as no longer being the leader, or the produce request time out is triggered. The impact for users is increased latency for the produce requests ahead of the LeaderAndISR.
      2. If the produce requests have acks = 0 or acks = 1, their records will be appended to the local log, and a response will be sent to the client immediately. In this case, the produce requests do not have prolonged latency. However since the appended records will not be replicated to other followers, after processing of the LeaderAndISR that makes the broker a follower, those records will be truncated. In contrast, if we can change the behavior and process the LeaderAndISR request immediately, an error code corresponding to NotLeaderForPartition will be returned to the clients, causing the clients to retry and avoid the data loss. Even though losing data for the acks = 0 and acks = 1 produce requests is allowed in Kafka, it'll be better if we can minimize the data loss.
    2. LeaderAndISR with partitions to be transitioned to leadersAgain let's consider the case that a LeaderAndISR request is delayed because of a backlog of Produce requests ahead of it, and the LeaderAndISR contains a partition that needs to be transitioned from a follower to a leader, say partition0. Further let's assume the previous leader for partition0 have resigned its leadership. In this case, before the LeaderAndISR is processed, the partition is effectively unavailable, both for producing and consuming. If the LeaderAndISR request is processed immediately, we can greatly shorten the unavailability interval.
  2. Delayed processing of an UpdateMetadata request. Delayed processing of an UpdateMetadata request means clients may receive stale metadata. For example, the stale metadata may have the wrong leadership info for certain partitions, causing the client not being to produce or consume until the correct metadata with up-to-date leadership is received. It will be much better if the UpdateMetadataRequest can be processed immediately after arriving at the a broker.

In summary, we'd like to mitigate the effect of stale metadata by shortening the latency between a controller request's arrival and processing on a given broker.

...

  • We plan to add a new metric 
    kafka.network:name=ControlPlaneRequestQueueSize,type=RequestChannel
    to show the size of the new controller control plane request queue.
  • We will add two new metrics 
    kafka.network:name=ControlPlaneNetworkProcessorIdlePercent,type=SocketServer
    kafka.server:name=ControlPlaneRequestHandlerIdlePercent,type=KafkaRequestHandlerPool
    with the former monitoring the idle percentage of pinned control plane network thread, and the latter monitoring the idle percentage of pinned control plane request handler thread. (Pinned control plane threads are explained in details below.)
  • The meaning of the existing metric
    kafka.network:name=RequestQueueSize,type=RequestChannel
    will be changed, and it will be used to show the size of the data request queue only, which does not include controller requests.
  • The meaning of the two existing metrics
    kafka.network:name=NetworkProcessorAvgIdlePercent,type=SocketServer
    kafka.server:name=RequestHandlerAvgIdlePercent,type=KafkaRequestHandlerPool
    will be changed slightly in that they now only cover the threads for data plane threads, not including the pinned threads for controller requests.
  • We plan to add a new config for brokers to specify a dedicated listener for controller connections: the "control.plane.listener.name" config. A detailed explanation of the config is shown in the "proposed changes" section.
  • A new listener-to-endpoint entry dedicated to controller connections need to be added to the "listeners" config and the "advertised.listeners" config.
    Also a new entry needs to be added to the "listener.security.protocol.map" to specify the security protocol of the new endpoint.

...

In order to eliminate queuing for controller requests, we plan to add dedicated endpoints to a dedicated endpoint on a brokers for controller connections and dedicated , as well as two dedicated control plane threads for handling controller requests. To explain the proposed change, we first go through how brokers should get the dedicated endpoints through configs, and expose the endpoints to Zookeeper. Then we discuss how a controller can learn about the dedicated endpoints exposed by brokers. Finally we describe how controller requests are handled over the dedicated connections.

...

Upon startup, a broker needs to get two list of endpoints: the listeners endpoints that are used to bind the server socket and accept incoming connections, as well as an advertised listeners endpoints endpoints list that are published to Zookeeper for clients or other brokers to establish connections with. More details on the reason of separating these two lists can be found at KAFKA-1092 and KIP-103. In terms of how the values for the two lists are derived, the author found we find it intuitive to understand the relationships of different configs using the following chart:

...

Upon detecting a new broker through Zookeeper, the controller will figure out which endpoint it should use to connect to the new broker by first determining the the inter-broker-listener-name. The inter-broker-listener-name is decided by using either the "inter.broker.listener.name" config or the "security.inter.broker.protocol" config. Then by using the "endpoints" section of the broker info, the controller can determine which endpoint to use for a the given inter-broker-listener-name. For instance, with the sample json payload listed above, if the controller first determines inter-broker-listener-name to be "INTERNAL", then it knows to use the endpoint "INTERNAL://broker1.example.com:9092" and security protocol PLAINTEXT to connect to the given broker.

...

Instead of using the inter-broker-listener-name value, we propose to add a new config "control.plane.listener.name" for determining the controller control plane endpoints. For instance, if the controller sees that the exposed endpoints by a broker is the following:

...

With the dedicated endpoints for controller connections, upon startup a broker will use the "control.plane.listener.name" to look up the corresponding endpoint in the listeners list for binding. For instance, in the example given above, the broker will derive the dedicated endpoint to be "CONTROLLER://192.1.1.8:9091". Then it will have a new dedicated acceptor that binds to this endpoint, and listens for controller connections. When a connection is received, the socket will be given to a dedicated control plane processor thread (network thread). The dedicated processor thread reads controller requests from the socket and enqueues them to a new dedicated control plane request queue for controller requests, whose default capacity is 20 [2].  On the other side of the controller request queue, a dedicated control plane request handler thread will take requests out, and handles them in the same way as being done today. In summary, we are 1) adding a dedicated acceptor, 2) pinning one processor thread, 3) adding a new request queue, and 4) pinning one request handler thread for controller connections and requests. The two new threads are exclusively for requests from the controller and do not handle data plane requests.

The metric

kafka.network:name=ControlPlaneRequestQueueSize,type=RequestChannel

will be added to monitor the size of the new control plane request queue. Another two new metrics

kafka.network:name=ControlPlaneNetworkProcessorIdlePercent,type=SocketServer
kafka.server:name=ControlPlaneRequestHandlerIdlePercent,type=KafkaRequestHandlerPool

will be added to monitor the idle ratio of the new control plane network thread, and control plane request handler thread respectively.

Finally as a special case, if If the "control.plane.listener.name" config is not set, then there is no way to tell the dedicated endpoint for controller, and hence . Hence there will be no dedicated acceptor, network processor, or request handler threads. The behavior should be exactly same as the current implementation.

...