Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updated after KIP call -> ProtocolVersionRequest

...

 

Additional key points

  • The BrokerMetadata ProtocolVersionRequest may be used in broker-broker communication to decide on a common protocol version between brokers with different versions, i.e., during upgrades.
  • Exposing certain broker configuration properties (such as "message.max.bytes") automatically aligns broker and client configuration, avoiding hard to debug issues.
  • Providing an interface to expose user-configurable key-value tags to the clients opens up for future use-cases, for example location based consumer group balancing could use a user-specified "rack" tag.

 

Public Interfaces

Add protocol BrokerMetadataRequest ProtocolVersionRequest & Response message

Code Block
BrokerMetadataRequestProtocolVersionRequest => [NodeId]
   NodeId => int32   // Request Metadata for these brokers only.
                     // Empty array: retrieve for all brokers.
                     // Use -1 for current broker only.


BrokerMetadataResponse (empty)
 
ProtocolVersionResponse => [BrokerIdApiKey Host Port ProtocolVersionMin ProtocolVersionMax [Key ValueApiName [ApiVersion]]
  NodeIdApiKey => int32      int16        // BrokerAPI protocol NodeIdkey
  HostApiName =>> string string    // Human readable form of API,      // Broker Hoste.g. "ProduceRequest"
  PortApiVersion => int32             int16   // BrokerSupported Port
versions in ProtocolVersionMinorder =>of int32preferredness, most // Broker's minimum supported protocol version
  ProtocolVersionMax => int32  // Broker's maximum supported protocol version
  Key => string                // Tag name
  Value => stirng              // Tag value
 preferred version first (thus typically descending order)
 
 
Example in pseudo format:
 
ProtocolVersionResponse =>
[
 { ApiKey: 0, ApiName: "ProduceRequest", ApiVersion: [2,1,0] },
 { ApiKey: 1, ApiName: "FetchRequest", ApiVersion: [1,0] },
 { ApiKey: 2, ApiName: "OffsetRequest", ApiVersion: [2,0,1] },
  ...
]

Proposed Changes

1. New protocol API:

...

ProtocolVersionRequest and

...

ProtocolVersionResponse

This KIP proposes a new protocol request and response to be added to the Kafka protocol, BrokerMetadataRequest ProtocolVersionRequest and BrokerMetadataResponse. While adding only the ProtocolVersion field to the response would solve the issues at hand it is probably a good idea to conceive a more generic interface that allows the broker to communicate any sort of information about itself in a generic fashion to cater for future needs, such as communicating supported features, protocols, endpoints, locality, etc, to the client.

To this end the proposal suggests a string-based key-value array response message populated by the broker.

Key-Value tags

The broker implements a certain set of builtin tags that are documented in great detail (format, how to parse, expected values, etc).

Builtin tags (suggested)

KeyValue (example)Description
broker.id3Broker's NodeId
broker.version0.9.0.0-SNAPSHOT-d12ca4fBroker protocol string (human readable format)
broker.version.int0x00090000

Broker version integer for comparison use

compression.codecs1=gzip,2=snappy,3=lz4Support codecs ("AttributesBit=Name")
message.max.bytes1000000Config property
message.formatsv1,v2Supported message formats (KIP-31)
endpointsplaintext://host:9092,ssl://host:9192Available endpoints for this broker. Allows upgrading a plaintext connection to SSL

 

These are all documented, including their value format and how to parse it.
The "broker.id" has multiple purposes:
  •  allows upgrading the bootstrap broker connection to a proper one since the broker_id is initially not known, but would be with this.
  • verifying that the broker connected to is actually the broker id that was learnt through TopicMetadata.. Useful for NAT, proxy hosts, etc.   

 

Example user-provided tags

User-provided tags are added to the broker server.properties configuration file.

KeyValue
aws.zoneeu-central-1
rackr8a9
clusterkafka3

The configuration format could look something like:

Code Block
   tag.aws.zone = eu-central-1
   tag.rack = r8a9
   tag.cluster = kafka3

Misc

Another benefit of this API is further client de-integration from ZooKeeper since broker and cluster metadata may now be queried directly from the broker.

ProtocolVersionResponse. A client may use this new API to query the broker for its supported protocol request types and versions and use the information to select a suitable set of conforming API requests and versions for further communication with the broker.

The ProtocolVersionResponse is only valid for the current connection to the broker; the client will need to reissue the ProtocolVersionRequest upon reconnect or when connecting to other brokers.

This API is optional for a client to implementWhile this new API has little impact to begin with it will be of great value as new protocol additions are made and it allows a thriving eco-system with generic clients and tools that can operate on multiple broker versions (which is important for tools/libs/apps packages provided by Linux distributions, or are otherwise not versioned with the official Kafka project, i.e., the non-java clients).

 

2. Improved handling of unsupported requests on broker

...

If a request is received for an unknown protocol request type, or for an unsupported version of a known type, the broker should respond with an empty response only including the common Length+CorrId header with Length=0, instead of closing the connection. The effect of this empty message received by the client is that the client will fail to parse the response (since it will expect a proper non-empty reply) and throw an error to the application with the correct context - that of a protocol parsing failure for request type XYZ. All existing clients should be able to handle this gracefully without any alterations to the code, while updated clients supporting the proposals in this KIP can handle the empty response accordingly by returning a "Request not supported by server" error the application. The level of detail in the returned error message to the application naturally varies between client implementations but is still by far better than a closed connection which says nothing of the underlying error.

 

ProtocolVersion

The ProtocolVersion is a global version incremented for each addition to the Kafka protocol, be it a new request/response type or a new request/response type version.
A broker only needs to keep track of the minimum and maximum supported versions, while clients may support a broader span of versions. It is up to the client, based on the broker's returned ProtocolVersion range, to select which request types and request versions to use when communicating with the broker. The selected protocol should only have relevance for a specific broker, meaning that for each broker it connects to it should retrieve the supported protocol versions.

Managing the ProtocolVersion

  •  The Kafka protocol specification will be updated to indicate which ProtocolVersion a certain request type and version was introduced in, as well as which ProtocolVersion it was removed (it could also state a deprecate version): E.g.:
    •          ProduceRequest v3:
    •              Added in ProtocolVersion: 19
    •              Deprecated in ProtocolVersion: 25
    •              Removed in ProtocolVersion: 27

Notes on managing the protocol specification

  • The Kafka protocol specification will be moved to the kafka.git repository so it

...

  • 's versioned along with the code.
  • The Kafka protocol specification of the latest Kafka release will be available on the homepage
  • No request/response definitions are ever removed from the protocol specification. When removing support for a request type/version only the "Removed in

...

  • :

...

  • <brokerVersion>" information is added.
  • Patches with additions to the Kafka protocol must also include

...

  • corresponding updates to the protocol specification

...

  • .

Compatibility, Deprecation, and Migration Plan

  • Existing clients are not affected by 1. (BrokerMetadataRequestProtocolVersionRequest), but will get the error propagation benefits of 2. (graceful errors).
  • Updated clients that add support for 1. will be able to adapt their functionality based on broker functionality and provide meaningful error propagation to applications when proper broker support is lacking.
  • Release with 0.9.0.0: Since 0.9.0.0 will incorporate a large number of new protocol request types that the various client implementations are likely to pick up and implement it makes sense to add this new functionality along with it.

Rejected Alternatives

  • Use an aggregated global ProtocolVersion
  • Include meta key-value tags in response ("broker.id=...", "message.max.bytes=...", ...)
  • Include user-defined tags in response ("aws.zone=...", "rack=...", ..)