You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

SocketServer currently prioritizes processing of new connections over processing of existing connections. If there is a connection storm, all new connections are accepted and the new connections are processed by the associated Processors before any processing existing connections. This can cause delays in closing existing connections, resulting in excessive memory usage. It also prevents brokers from making any progress if delays in processing a connection storm results in request timeouts in clients, which then create even more connections.

This KIP proposes to improve fairness in channel processing in the SocketServer by limiting the number of new connections processed in an iteration by each Processor before processing existing connections. To avoid a huge backlog of accepted connections in the broker, a fixed size blocking queue will be used to limit the number of accepted new connections that have not yet been processed, applying backpressure and reducing resource usage on the broker.

Public Interfaces

No new interfaces or configuration options will be added. The size of the blocking queue used for new connections will set to 20 for each Processor. Like polling interval in the SocketServer, it is unlikely that users will require this number to be changed. During normal operation, a small limit allows progress to be made on new channels as well as existing channels. Since the Selector is woken up when new connections arrive, this limit does not introduce any unnecessary delays in connection processing.

A new metric will be added to track the amount of time SocketServer is blocked from accepting connections due to backpressure. This will be a yammer Meter, consistent with other SocketServer metrics.

  • kafka.network:type=Acceptor,name=AcceptorIdlePercent,listener={listenerName}

Proposed Changes

Acceptor accepts new connections and allocates them to Processors using round-robin allocation. In the current implementation, Acceptor accepts as fast as possible and adds new connections to unbounded queues associated with each Processor.

The connection queue for Processors will be changed to ArrayBlockingQueue with a fixed size of 20. Acceptor will use round-robin allocation to allocate each new connection to the next available Processor to which the connection can be added without blocking. If a Processor's queue is full, the next Processor will be chosen. If the connection queue on all Processors are full, Acceptor blocks until the connection can be added to the selected Processor. No new connections will be accepted during this period. The amount of time the Acceptor is blocked can be monitored using the new AcceptorIdlePercent metric.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?

No externally visible interface changes are proposed in this KIP. During normal operations, this is unlikely to result in any impact. When a large number of connections are made to the broker at the same time, connections may be established slower than before and existing connections may be processed faster. As with the current implementation, this could result in request timeouts if the broker is overloaded. But resource usage on the broker will be reduced as a result of these changes.

Rejected Alternatives


  • No labels