Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7719
,
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7730

...

No new interfaces or will be added. The size of the blocking queue used for new connections will be set to 50 20 for each Processor. This matches the The backlog queue size for incoming connections on the server socket . Kafka brokers use is currently the Java default backlog size of 50 and this limit is not configurable. With a default of num.network,threads=3, a per-processor queue size of 20 enables this backlog to be processed without blocking. Like polling interval in SocketServer, it is unlikely that users will require the queue size to be changed. During normal operation, a small limit allows progress to be made on new channels as well as existing channels. Since Selector is woken up when new connections arrive or existing connections are ready to be processed, this limit does not introduce any unnecessary delays in connection processing.

...

  • kafka.network:type=Acceptor,name=AcceptorIdlePercentAcceptorBlockedPercent,listener={listenerName}

A new broker configuration option will be added to limit the total number of connections that may be active on the broker at any time. This is in addition to the existing max.connections.per.ip config that will continue to limit the number of connections from each host ip address. When the limit is reached, new connections will not be accepted until one or more existing connections are closed. The limit This will be a dynamic listener broker-wide config that can be updated without restarting the broker.

Config option: Name: max.connections Type: Int Default value: Int.MaxValue

The config may be prefixed with listener prefix to specify different listener-specific limits for different listeners, enabling inter-broker connections to be created even if there are a large number of client connections on  a different listener. Config option: Name: max.connections Type: Int Default value: Int.MaxValueListener-specific limits will be applied in addition to the broker-wide limit. If a listener-specific limit is not specified, each listener can create up to the broker-wide limit as long as the total is within the limit. If a broker has multiple listeners, connections on the inter-broker listener will always succeed as long as they are within that listener's limit. In this case, the least-recently used connection on another listener will be closed to accommodate the inter-broker connection.

Proposed Changes

Acceptor accepts new connections and allocates them to Processors using round-robin allocation. In the current implementation, Acceptor accepts as fast as possible and adds new connections to unbounded queues associated with each Processor.

The connection queue for Processors will be changed to ArrayBlockingQueue with a fixed size of 50 20. Acceptor will use round-robin allocation to allocate each new connection to the next available Processor to which the connection can be added without blocking. If a Processor's queue is full, the next Processor will be chosen. If the connection queue on all Processors are full, Acceptor blocks until the connection can be added to the selected Processor. No new connections will be accepted during this period. The amount of time Acceptor is blocked can be monitored using the new AcceptorIdlePercent AcceptorBlockedPercent metric.

Acceptor will stop accepting new connections when the broker's max.connections limit is reached. New connections will be accepted as soon as a connection is closed by any of the Processors. Acceptor will also stop accepting new connections when its listener's listener.name.{listener}.max.connections limit is reached. New connections will be accepted as soon as a connection is closed by any of the Processors of that listener. Inter-broker connections will be protected in multi-listener brokers by closing client connections to accommodate inter-broker connections. Any time spent by Acceptor waiting for connections to close will also be included in the new AcceptorIdlePercent AcceptorBlockedPercent metric. The existing max.connections.per.ip config will also be applied without any changes. Connections dropped due to hitting the per-ip limit will not appear in the AcceptorIdlePercent AcceptorBlockedPercent metric since these connections are accepted and then dropped.

...

In typical scenarios, Kafka uses long-lived connections, so a small queue size is sufficient to ensure that new connections are processed promptly and existing connections are not left behind. Queue size of 50 20 per-processor is proposed in this KIP to match ensure that the server socket backlog queue size for which we use the Java default of 50 can be processed by 3 network threads without blocking. The goal of this KIP is to protect the broker in scenarios when a very large number of clients connect at the same time. This is likely to be true only for short bursts and hence the small queue size of 50 20 should be sufficient to ensure fairness in channel processing while protecting the broker from the surge. It is not clear that the number will need to be tweaked for different deployments since queue size is per-processor and the number of processors can be configured using num.network.threads.