Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder discussion

Discussion thread: Originally: msg-55658. Follow-up: here.

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-4090

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP addresses a minor addition to the set of configuration properties of Kafka producers in order to make the client validates the size of a message received from brokers (as denoted by the first four bytes of the response). The motivation is to prevent out-of-memory (OOM) errors when the size to be allocated is abnormally high.

...

As part of the client-side configuration for producers and consumers, the following property could property max.response.size will be added.

Note that this property has a different purpose from fetchsocket.response.max.bytes

It could be set as any other producer property:

...

languagejava

...

defined for consumers. The latter is used to control the size of batches to be received by a consumer, while the former applies to any message read from the socket. Also note fetch.max.bytes will not put a limit the size of the first batch received by a consumer.

Proposed Changes

This new element of configuration was originally suggested on the discussion thread and mirrors what already exists server-side, i.e. the property socket.request.max.bytes - which was actually added in brokers for the very same reason. 

...

  • It does not modify the message decoding logic on the client. That is, the client is not trying to identify a failed SSL handshake by forward-reading a message. Indeed, doing so would affect the code path for all messages, thus would be likely introducing a better-avoided overhead;
  • It introduces minimal change in the codebase. Because this functionality is already implemented server-side and the client uses the same network-level classes as the server, propagating the value of this new property is a minimal-impact, low-risk change. It is also fully transparent from a server perspective.
  • It is intended to exhibit no change in behaviour for the existing clients, as long as the default value for the maximum allowed size is above that of any valid message (note: need to check this)..

This proposal appears to come with multiple drawbacks, some of which may be considered as red flags.

Incorrect failure mode

As was experimented and as can be seen as part of the integration tests, when an invalid size is detected and the exception InvalidReceiveException is thrown, consumers and producers keeps retrying until the poll timeout expires or the maximum number of retries is reached. This is incorrect if we consider the use case of protocol mismatch which motivated this change. Indeed, producers and consumers would need to fail fast as retries will only prolong the time to remediation from users/administrators.

Limited scope

The proposed change However, it cannot provide an definite guarantee against OOM in all scenarios - for instance, it will still manifest if the maximum size is set to 100 MB and the JVM is under memory pressure and have less than 100 MB of allocatable memory.

Illegitimate message rejection

Even worse: what if the property is incorrectly configured and legitimate messages not reaching the client?

Potential confusing

  • The name max.response.size intends to mirror the existing max.request.size from the producer's configuration properties. However, max.request.size intends to check the size of producer records as provided by a client; while max.response.size is to check the size directly decoded from the network according to Kafka's binary protocol.
  • On the broker, the property socket.request.max.bytes is used to validate the size of messages received by the server. The new property serves the same purpose, which introduces duplicated semantic, even if one property is characterised with the keyword "request" and the other with "response", in both cases reflecting the perspective adopted from either a client or a server.

Compatibility, Deprecation, and Migration Plan

This feature preserves backward compatibility. A default maximal size should be chosen tactically in order to avoid any change in behaviour when consuming large messages (i.e. legitimate responses should not be rejected). The same default value as socket.request.max.bytes, or 104,857,600 bytes, could be used (question).

Rejected Alternatives

None.

[*] Notes

The server-side SSL engine detects the message is invalid as confirmed in server logs:

...

And sends back to the client the following sequence of bytes 15 03 03 00 02 02 50:

 ________________________________________________________________________________________________
| Message bytes |        0x15           0x0303      0x0002             0x02                 0x50 |
|_______________|________________________________________________________________________________|
|         Field | Record type |        Version |    Length | Handshake Type |        Alert desc. |
|_______________|_____________|________________|___________|________________|____________________|
|   Field value |       ALERT | 3.3 (TLS v1.2) |   2 bytes |   Server Hello | internal_error(80) |
|_______________|_____________|________________|___________|________________|____________________|

The client tries to allocate 0x15030300 = 352,518,912 bytes which can result in OOM depending on the available heap size.

Reference

RFC 5246 - TLS v1.2