Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7719

...

This KIP proposes to improve fairness in channel processing in SocketServer by limiting the number of new connections processed in an iteration by each Processor before processing existing connections. To avoid a huge backlog of accepted connections in the broker, a fixed size blocking queue will be used to limit the number of accepted new connections that have not yet been processed, applying backpressure and reducing resource usage on the broker. The total number of active connections on the broker will also be limited using a new configuration option to protect the broker from DDOS using a large number of connections from different IPs.

Public Interfaces

No new interfaces or configuration options will be added. The size of the blocking queue used for new connections will set to 20 for each Processor. Like polling interval in SocketServer, it is unlikely that users will require this number to be changed. During normal operation, a small limit allows progress to be made on new channels as well as existing channels. Since Selector is woken up when new connections arrive or existing connections are ready to be processed, this limit does not introduce any unnecessary delays in connection processing.

...

  • kafka.network:type=Acceptor,name=AcceptorIdlePercent,listener={listenerName}

A new broker configuration option will be added to limit the total number of connections that may be active on the broker at any time. This is in addition to the existing max.connections.per.ip config that will continue to limit the number of connections from each host ip address. When the limit is reached, new connections will not be accepted until one or more existing connections are closed.

Config option: Name: max.connections Type: Int Default value: Int.MaxValue

Proposed Changes

Acceptor accepts new connections and allocates them to Processors using round-robin allocation. In the current implementation, Acceptor accepts as fast as possible and adds new connections to unbounded queues associated with each Processor.

The connection queue for Processors will be changed to ArrayBlockingQueue with a fixed size of 20. Acceptor will use round-robin allocation to allocate each new connection to the next available Processor to which the connection can be added without blocking. If a Processor's queue is full, the next Processor will be chosen. If the connection queue on all Processors are full, Acceptor blocks until the connection can be added to the selected Processor. No new connections will be accepted during this period. The amount of time Acceptor is blocked can be monitored using the new AcceptorIdlePercent metric.

Acceptor will stop accepting new connections when the broker's max.connections limit is reached. New connections will be accepted as soon as a connection is closed by any of the Processors. Any time spent waiting for connections to close will also be included in the new AcceptorIdlePercent metric. The existing max.connections.per.ip config will also be applied without any changes.

Compatibility, Deprecation, and Migration Plan

...