Table of Contents |
---|
Status
Current state: "Under Discussion"Accepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA
JIRA: Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-10023
: here [Change the link from KAFKA-1 to your own ticket] Jira server ASF JIRA serverId 5aa69414-a9e9-3523-82ec-879b028fb15b key KAFKA-10749
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Creating a new connection adds CPU overhead to the broker. KIP-306 mitigated the issue of connection storm due to un-authorized connections (e.g., misconfigured clients). However, connections storms may also come from (mostly) well-behaved clients. One example is when deploying a new application may cause temporary connection storm due to a large number of clients starting up and creating connections to the cluster at the same time. Another example is clients that create a new connection for each produce/consume request, causing high connection rate to the brokers. A very high connection creation rate may stop broker from doing other useful work, causing high request latencies or even URPs.
To address this issue, the KIP proposes to add the ability to set a limit on the rate with which the broker accepts new connections. To ensure that some clients do not take over most of the connection creation rate quota, the KIP also adds the ability to set a limit on connection creation rate per IP.
Public Interfaces
Broker configurations for broker-wide and per-listener connection rate limits
A new broker configuration option will be added to limit the total rate at which non-inter-broker connections will be accepted on the broker. Connections on the inter-broker listener will be permitted even if the configured broker-wide limit is reached. This will be a dynamic config that can be updated without restarting the broker.
...
The config may be prefixed with a listener prefix to specify a different listener-specific limit: listener.name.{listenerName}.max.connection.creation.rate
. Listener-specific limits will be applied in addition to the broker-wide limit. If a listener-specific limit is not specified, each listener can create connections with the rate up to the broker-wide limit as long as the total rate is also within the broker-wide limit. If a broker has multiple listeners, connections on the inter-broker listener will always succeed as long as connection creation rate is within that listener's rate limit. The behavior of the proposed broker-wide and per-listener configs is consistent with max.connections broker configuration (KIP-402).
Dynamic configurations for a new entity type IPs
Per-IP connection creation rate quota will be configured as a dynamic quota for entity type ips
.
- Default quota for <IPs> will be stored in Zookeeper at /config/ips/<default>
- Quota for a specific IP address for which quota override is defined will be stored in Zookeeper at /config/ips/<ip-address>
If per-IP or default IP quota is not configured, we will use Int.MaxValue as a default to ensure backward compatibility.
We will use the AlterClientQuota
and DescribeClientQuota
APIs to manage IP connection rate quotas. Adding support for IP quotas will not require any changes to the protocol, since entity type and name are represented flexibly as Strings. For ease of use, we will define a new recognized IP entity type in org.apache.kafka.common.quota.ClientQuotaEntity:
Code Block |
---|
/** * Describes a client quota entity, which is a mapping of entity types to their names. */ public class ClientQuotaEntity { /** * The type of an entity entry. */ public static final String USER = "user"; public static final String CLIENT_ID = "client-id"; public static final String IP = "ip"; } |
Since IP quotas are orthogonal with user/client ID quotas the following is to disambiguate how the new IP
entity will interact with the existing client-id
and user
entities.
A DescribeClientQuotasRequest
constructed with a filter for both ip entities and either of user or client-id represents a logical AND
of IP quotas and user/client quotas. However, since there are no quotas that satisfy both IP and either of user/client-id, such a request is likely due to user error. In such a case we will return an INVALID_REQUEST
error.
Likewise, an AlterClientQuotasRequest
with both an IP entity and either of user or client-id entity will return an INVALID_REQUEST
error, as there are no shared quota properties between the two entity types.
Tools
kafka-configs.sh
will be extended to support per-IP connection rate quotas. A new entity type IPs
will be added with the following key (and a value of type Int):
connection_creation_rate
: The total connection rate limit for the connections from a specific IP address.
For example:
bin/kafka-configs --bootstrap-server localhost:9091 --alter --add-config 'connection_creation_rate=100' --entity-name 93.284.53.13 --entity-type ips
bin/kafka-configs --bootstrap-server localhost:9091 --alter --add-config 'connection_creation_rate=100' --ip 93.284.53.13
Default connection rate quotas for an IP address can be configured via the following:
bin/kafka-configs --bootstrap-server localhost:9091 --alter --add-config 'connection_creation_rate=100' --entity-type ips --entity-default
bin/kafka-configs --bootstrap-server localhost:9091 --alter --add-config 'connection_creation_rate=100' --ip-defaults
Connection rate throttling behavior
The connection creation rate limits will be applied to the same quota window configuration (quota.window.size.seconds
with 1 second default) as existing produce/fetch quotas and request rate quota (KIP-124). Since limit on connection creation rate on the broker is also a type of quota, this approach will keep it consistent with the existing quota implementations on the broker. If connection creation rate on the broker exceeds the broker-wide limit, the broker will delay accepting a new connection by an amount of time that brings the rate within the limit. If a listener-specific limit is specified, and the connection rate on that listener exceeds the limit, the broker will delay accepting new connection on that listener by an amount of time that brokers the rate of the listener within the listener limit. The maximum delay applied will be the quota window size (which also means that the minimum connection rate limit is effectively 1 connection creation / second).
Since the broker has to accept the connection in order to know the IP address, limiting connection creation rate per IP means that the enforcement of the limit has to happen after connection is accepted. Dropping the connection right away as broker does for per-IP connection count limits (KIP-402) does not work as well for connection rate limiting, because the offending client is likely to immediately reconnect. At the same time, connection rate limiting by only delaying processing of connections would not work for cases where the connection creation rate is continuously higher than the set limits, which would create a large backlog. This case could be more common for clients that come through a proxy, and where there could be potentially a large number of incoming connections with the same IP. To address these two issues, our approach is as follows. If connection creation rate is reached for a specific IP address, the broker will delay processing the connection by an amount of time that brings the rate within the limit or 1 second, whichever is earliest. After the delay, if the per-IP quota is still violated, the connection will be cleaned up; otherwise, the connection will be accepted.
Metrics
New metrics that track broker-wide or per-listener rate of accepting connections, and are used by SocketServer to detect connection acceptance rate quota violation:
kafka.network:type=socket-server-metrics,name=connection-accept-rate,listener={listenerName}
- Type: Rate
- Description: Rate of connections accepted per second on a given listener
kafka.network:type=socket-server-metrics,name=broker-connection-accept-rate
- Type: Rate
- Description: Broker-wide rate of connections accepted per second
kafka.network:type=socket-server-metrics,name=connection-accept-rate,ip={ipAddress}
- Type: Rate
- Description: Rate of connections accepted per second for a given IP
New metrics that track average throttle time of accepting a new connection due to reaching connection acceptance rate limit:
kafka.network:type=socket-server-metrics,name=connection-accept-throttle-time,listener={listenerName}
- Type: SampledStat.Avg
- Description: Average throttle time due to violating per-listener or broker-wide connection acceptance rate quota on a given listener.
kafka.network:type=socket-server-metrics,name=ip-connection-accept-throttle-time,listener={listenerName}
- Type: SampledStat.Avg
- Description: Average throttle time due to violating ip connection rate quota on a given listener. IP throttling is reported at a listener granularity rather than at an IP granularity to keep a smaller metrics overhead.
No new metrics will be added. The existing metric (kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listenerName}) that tracks the amount of time Acceptor
is blocked from accepting connections will now additionally include the amount of time Acceptor
is blocked due to hitting connection create limit (in addition to the time blocked due to hitting the maximum limit on currently active connections).
...
The broker will track connection acceptance rates, broker-wide and , per every listener and per IP, via sensors that wrap the Rate
metric with the MetricConfig
. MetricConfig#quota
will be set to the corresponding configured connection creation rate limit. When Acceptor
accepts a new connection, the broker-wide and the corresponding listener's metric will be incremented. When the actual connection creation rate exceeds either broker-wide or listener-specific quota, quota violation exception will be thrown. On quota violation, the broker will calculate the delay needed to bring the metric within quota by using the same formula implemented in ClientQuotaManager.throttleTime
. The Acceptor
thread will wait for the delay duration before accepting new connections. The maximum delay applied will be the quota window size (1 second by default).
...
- If the number of active connections is below the limit, but broker hits the connection rate limit, the
Acceptor
will wait for the calculated delay that brings the connection creation rate metric within quota. - If there are no available active connection slots, the broker waits for the new slot independent of whether connection rate exceeds quota or not.
When quota violation happens due to reaching the limit for a IP address, we will calculate the delay = min(delay required to bring the rate within the quota, 1 second). We will re-use the same mechanism implemented with KIP-306 where the broker delays the response for failed client authentication. When the delay passes, we will check for quota violation again. The connection will be cleaned up (dropped) if the quota is still violated, or the connection will be added to one of the Processor queues for processing (accepted).
Compatibility, Deprecation, and Migration Plan
...
There will be no impact on existing users.
Rejected Alternatives
Broker configurations to set limits on per IP connection attempt rates
Add broker configurations for setting the default per-IP connection attempt limit and per-IP limit overrides:
Config option: Name: max.connection.creation.rate.per.ip
Type: Int
Default value: Int.MaxValue
Config option: Name: max.connection.creation.rate.per.ip.overrides
Type: String
Default value: ""
We chose the option of making per-IP quota similar to other quotas, configured as a dynamic quota for entity type "ip" as well as defaults.
Use incrementalAlterConfigs/describeConfigs for IP quotas
DescribeConfigsRequest
requires the user to specify a list of ConfigResource
which consists of a Type
and Name
for which to retrieve the desired configs. An empty name indicates that we want to describe the configs for the entity type default. This works well for describing broker and topic configs, because the full list of brokers and topics is retrievable via other requests.
However, for IPs there is no mechanism for IP discovery, and DescribeConfigsRequest
has no way to indicate that a request should list all configs for an entity type without also giving a list of entity names. This would make discovery of which IPs have a connection rate quota unfeasible, so this was rejectedNone.