Status
Current state: Under discussion
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP addresses a minor addition to the set of configuration properties of Kafka producers in order to make the client validates the size of a message received from brokers (as denoted by the first four bytes of the response). The motivation is to prevent out-of-memory (OOM) errors when the size to be allocated is abnormally high.
A typical scenario where this vulnerability to OOM is exhibited consists in a client sending a plaintext request to an SSL endpoint on a broker. When this happens, the broker expects a Client Hello as part of a new TLS handshake. It detects the request is invalid [*] and sends back an Alert to the client. The client reads the response starting with the first four bytes to infer the size of the reply. With TLS v2.2, this results in 352,518,912 bytes trying to be allocated.
As reported in KAFKA-4090, this can happen when the producer property security.protocol
is not defined, because the client then defaults to the plaintext transport layer implementation.
Public Interfaces
As part of the client-side configuration for producers, the following property could be added: socket.response.max.bytes
It could be set as any other producer property:
Properties kafkaProducerConfigs = new Properties(); kafkaProducerConfigs.setProperty("socket.response.max.bytes", "104857600"); ... KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProducerConfigs);
Proposed Changes
This new element of configuration was originally suggested on the discussion thread and mirrors what already exists server-side, i.e. the property socket.request.max.bytes -
which was actually added in brokers for the very same reason.
The class org.apache.kafka.common.network.NetworkReceive
which is used both client- and server-side already takes a maximum receive size and throws a org.apache.kafka.common.network.InvalidReceiveException
when a message indicates a higher value. Therefore the change will mostly consists in propagating the value assigned to the property socket.response.max.bytes
(or a sensible default) to the NetworkReceive
(and the classes in-between). More precisely, in the method org.apache.kafka.clients.producer.KafkaProducer#newSender
, the value can be provided to the constructor of the org.apache.kafka.common.network.Selector
used later on by the client.
This proposed change was influenced by the following considerations:
- It does not modify the message decoding logic on the client. That is, the client is not trying to identify a failed SSL handshake by forward-reading a message. Indeed, doing so would affect the code path for all messages, thus would be likely introducing a better-avoided overhead;
- It introduces minimal change in the codebase. Because this functionality is already implemented server-side and the client uses the same network-level classes as the server, propagating the value of this new property is a minimal-impact, low-risk change. It is also fully transparent from a server perspective.
- It is intended to exhibit no change in behaviour for the existing clients, as long as the default value for the maximum allowed size is above that of any valid message (note: need to check this).
However, it cannot provide an definite guarantee against OOM in all scenarios - for instance, it will still manifest if the maximum size is set to 100 MB and the JVM is under memory pressure and have less than 100 MB of allocatable memory.
Compatibility, Deprecation, and Migration Plan
This feature preserves backward compatibility. A default maximal size should be chosen tactically in order to avoid any change in behaviour when consuming large messages (i.e. legitimate responses should not be rejected). The same default value as socket.request.max.bytes,
or 104,857,600 bytes, could be used .
Rejected Alternatives
None.
[*] Notes
The server-side SSL engine detects the message is invalid as confirmed in server logs:
kafka-network-thread-0-ListenerName(SSL)-SSL-4, fatal error: 80: problem unwrapping net record javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? kafka-network-thread-0-ListenerName(SSL)-SSL-4, SEND TLSv1.2 ALERT: fatal, description = internal_error kafka-network-thread-0-ListenerName(SSL)-SSL-4, WRITE: TLSv1.2 Alert, length = 2
And sends back to the client the following sequence of bytes
________________________________________________________________________________________________ | Message bytes | 0x15 0x0303 0x0002 0x02 0x50 | |_______________|________________________________________________________________________________| | Field | Record type | Version | Length | Handshake Type | Alert desc. | |_______________|_____________|________________|___________|________________|____________________| | Field value | ALERT | 3.3 (TLS v1.2) | 2 bytes | Server Hello | internal_error(80) | |_______________|_____________|________________|___________|________________|____________________|
The client tries to allocate 0x15030300 = 352,518,912 bytes which can result in OOM depending on the available heap size.