Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion threadThread

...

ApiVersionsRequest is bumped to version 3 with two new fields. ApiVersionsRequest  ApiVersionsRequest version is a flexible version (KIP-482: The Kafka Protocol should Support Optional Tagged Fields).

...

ApiVersionsResponse is bumped to version 3 but does not have any changes in the schema. Note that ApiVersionsResponse is flexible version but the response header is not a flexible version. This is necessary because the client must look at a fixed offset to find the error code, regardless of the response version, to remain backward compatible.

Code Block
languagejs
{
  "apiKey": 18,
  "type": "response",
  "name": "ApiVersionsResponse",
  // Version 1 adds throttle time to the response.
  // Starting in version 2, on quota violation, brokers send out responses before throttling.
  //
  // Version 3 is the samefirst as version 2flexible version. Tagged fields are only supported in the body but
  // not in the header. The length of the header must not change in order to guarantee the
  // backward compatibility.
  //
  // Starting from Apache Kafka 2.4, ApiKeys field is populated with the supported versions of
  // the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
  "validVersions": "0-3",
  "flexibleVersions": "none3+",
  "fields": [
    { "name": "ErrorCode", "type": "int16", "versions": "0+",
      "about": "The top-level error code." },
    { "name": "ApiKeys", "type": "[]ApiVersionsResponseKey", "versions": "0+",
      "about": "The APIs supported by the broker.", "fields": [
      { "name": "Index", "type": "int16", "versions": "0+", "mapKey": true,
        "about": "The API index." },
      { "name": "MinVersion", "type": "int16", "versions": "0+",
        "about": "The minimum supported version, inclusive." },
      { "name": "MaxVersion", "type": "int16", "versions": "0+",
        "about": "The maximum supported version, inclusive." }
    ]},
    { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }
  ]
}

Errors.INVALID_REQUEST is added.

code
public enum Errors {
    ...
    INVALID_REQUEST(XX, "The validation of the request has failed.", InvalidRequestException::new);
    ...
}
 
public class InvalidRequestException extends ApiException {
    public InvalidRequestException(String message) {
        super(message);
    }
}

Metrics

We will add

...

a new metric in the

...

Selector to surface information about the connected clients.

...

 The mbean will be:

Code Block

...

kafka.server:

...

The clients connected to the broker where each Map represents a connection with the following metadata:

  • ClientId
  • ClientSoftwareName
  • ClientSoftwareVersion
  • ClientAddress
  • Principal
  • Listener
  • SecurityProtocol

...

clientSoftwareName=(client-software-name),clientSoftwareVersion=(client-software-version),listener=(listener),networkProcessor=(processor-index),type=(type)

It will contain a single value name "connections". This will contain the number of currently open connections using the given client software name and version. If the number of connections drops to 0, the mbean will be removed.

A typical example of how this will look:

Code Block
kafka.server:clientSoftwareName=apache-kafka-java,clientSoftwareVersion=2.4.0,listener=PLAINTEXT,networkProcessor=1,type=socket-server-metrics

Request Log

While the Request Log is not a public interface, it is worth mentioning that we will enrich it with the ClientSoftwareName and the ClientSoftwareVersionclient information.

Code Block
languagetext
[2019-07-02 14:11:16,137] DEBUG Completed request:RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=consumer-1, correlationId=11) -- {coordinator_key=console-consumer-17661,coordinator_type=0},response:{throttle_time_ms=0,error_code=15,error_message=null,coordinator={node_id=-1,host=,port=-1}} from connection 192.168.12.241:9092-192.168.12.241:52149-3;totalTime:3.187,requestQueueTime:0.137,localTime:2.899,remoteTime:0.0,throttleTime:0.098,responseQueueTime:0.048,sendTime:0.124,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientSoftwareName:java,clientSoftwareVersion:2.2.0 clientInformation:ClientInformation(softwareName=apache-kafka-java, softwareVersion=2.4.0)(kafka.request.logger)

Proposed Changes

...

To circumvent this, we propose to enhance the fail back mechanism as follow:

  1. When an unknown unsupported version of the ApiVersionsRequest is received by the broker, it fails back to ApiVersionsRequest v0 and sends back an ApiVersionsResponse v0 with the UNSUPPORTED_VERSION error (as today) but the broker also populates the api_versions field with the supported version of the ApiVersionsRequest.
  2. When the client receives a response, it tries to deserialise it with the version it expects. If the parsing failsan unsupported version fo the ApiVersionResponse, it fails back to version 0 (as today). As version 0 of contains both the ApiVersionsResponse ErrorCode and checks the ErrorCode. If an UNSUPPORTED_VERSION error is received, the client gets the supported version provided in the ApiKeys field or default to version 0 if the ApiKeys field is emptyApiKeys fields, the client checks the error and, in case of an UNSUPPORTED_VERSION error, it checks the ApiKeys to get the supported versions or default to versions 0 if not present.

At the moment, the ApiVersionsRequest is handled in two different places in the broker: 1) in the SaslServerAuthenticator (when used); and 2) in the KafkaApis. Both places will be updated to ensure that all clients work. We have decided to not refactor the handling of the ApiVersionsRequest for now and to leave it for further improvements.  

...

ApiVersions Request/Response Handling

As mentioned earlier, when the client receives a response, it tries to deserialise it with the version it expects. If the parsing failsan unsupported version fo the ApiVersionResponse, it fails back to version 0 of the ApiVersionsResponse and checks the ErrorCode. If an UNSUPPORTED_VERSION error is received, the client gets the supported version provided in the ApiKeys field or default to version 0 if the ApiKeys field is empty. Then(as today). As version 0 contains both the ErrorCode and ApiKeys fields, the client checks the error and, in case of an UNSUPPORTED_VERSION error, it checks the ApiKeys to get the supported versions or default to versions 0 if not present. Then, it restarts the process with this version.I have 

When the client receives an INVALID_REQUEST error, it will error out and close the connection.

...