Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Updates from mail discussion

...

This makes it hard for clients and applications to support multiple versions of Kafka, which in turn limits the Kafka eco-system since applications and clients will need to be manually built or configured for a specific broker version.

 

Additional key points

  • The BrokerMetadata may be used in broker-broker communication to decide on a common protocol version between brokers with different versions, i.e., during upgrades.
  • Exposing certain broker configuration properties (such as "message.max.bytes") automatically aligns broker and client configuration, avoiding hard to debug issues.
  • Providing an interface to expose user-configurable key-value tags to the clients opens up for future use-cases, for example location based consumer group balancing could use a user-specified "rack" tag.

 

Public Interfaces

Add protocol BrokerMetadataRequest & Response message

Code Block
BrokerMetadataRequest => [NodeId]
   NodeId => (empty)
  int32   // Request Metadata for these brokers only.
                     // Empty array: retrieve for all brokers.
                     // Use -1 for current broker only.


BrokerMetadataResponse => ProtocolVersion[BrokerId Host Port ProtocolVersionMin ProtocolVersionMax [Key Value]]
  ProtocolVersionNodeId => int32              // GlobalBroker protocolNodeId
 version, bumpedHost manually=> forstring each addition to the protocol. Starts at 9 (for 0.9.0.0) with the addition of this// requestBroker type.Host
  KeyPort => int32 String               // Generic key name, such as "broker.version"
  Value => StringBroker Port
  ProtocolVersionMin => int32  // Broker's minimum supported protocol version
  ProtocolVersionMax => int32  // Broker's maximum supported protocol version
  Key => string                // Generic value for key, e.g. "0.9.0.0"

Note: the ProtocolVersion field could be a Key-Value too, but having it as a specific int field might make it easier for clients to use.

Java client
Code Block
TBD: some interface to query BrokerMetadata information Tag name
  Value => stirng              // Tag value
 

Proposed Changes

1. New protocol API: BrokerMetadataRequest and BrokerMetadataResponse

This KIP proposes a new protocol request and response to be added to the Kafka protocol, BrokerMetadataRequest and BrokerMetadataResponse. While adding only the ProtocolVersion field to the response would solve the issues at hand it is probably a good idea to conceive a more generic interface that allows the broker to communicate any sort of information about itself in a generic fashion to cater for future needs, such as communicating supported features, protocols, endpoints, locality, etc, to the client.

To this end the proposal suggests a string-based key-value array response message populated by the broker.Suggestions/examples on (future) uses of Key & Value which may be of relevance to clients:

Key-Value tags

The broker implements a certain set of builtin tags that are documented in great detail (format, how to parse, expected values, etc).

Builtin tags (suggested)

Reasonsupported.gzip,90919092
KeyValue (example)Description
broker.id3Broker's NodeId
broker.version0.9.0.0Useful for monitoring applications, admin tools, etc.-SNAPSHOT-d12ca4fBroker protocol string (human readable format)
broker.version.int0x00090000

Broker version integer for comparison use

compression.codecs1=gzip,2=snappy,3=lz4Support codecs ("AttributesBit=Name")
message.max.bytes1000000Config property
message.formatsv1,v2Supported message formats (KIP-31)Lets a newer producer avoid using unsupported codecs. Alternatively using the Message.Attributes codes (e.g. "1,2,5").
endpointsplaintext://host:9092,ssl://host:

Announces broker endpoints, allows clients to "upgrade" connection to SSL

<...><..>Arbitrary key-value configured in broker.properties by user. E.g., "rack"="a1b2", for consumer balancing.
9192Available endpoints for this broker. Allows upgrading a plaintext connection to SSL

 

These are all documented, including their value format and how to parse it.
The "broker.id" has multiple purposes:
  •  allows upgrading the bootstrap broker connection to a proper one since the broker_id is initially not known, but would be with this.
  • verifying that the broker connected to is actually the broker id that was learnt through TopicMetadata.. Useful for NAT, proxy hosts, etc.   

 

Example user-provided tags

User-provided tags are added to the broker server.properties configuration file.

KeyValue
aws.zoneeu-central-1
rackr8a9
clusterkafka3

The configuration format could look something like:

Code Block
   tag.aws.zone = eu-central-1
   tag.rack = r8a9
   tag.cluster = kafka3

Misc

Another benefit of this API is further client de-integration from ZooKeeper since broker and cluster metadata may now be queried directly from the broker.

While this new API has little impact to begin with it will be of great value as new protocol additions are made and it allows a thriving eco-system with generic clients and tools that can operate on multiple broker versions (which is important for tools/libs/apps packages provided by Linux distributions, or are otherwise not versioned with the official Kafka project, i.e., the non-java clients).

...

If a request is received for an unknown protocol request type, or for an unsupported version of a known type, the broker should respond with an empty response only including the common Length+CorrId header with Length=0, instead of closing the connection. The effect of this empty message received by the client is that the client will fail to parse the response (since it will expect a proper non-empty reply) and throw an error to the application with the correct context - that of a protocol parsing failure for request type XYZ. All existing clients should be able to handle this gracefully without any alterations to the code, while updated clients supporting the proposals in this KIP can handle the empty response accordingly by returning a "Request not supported by server" error the application. The level of detail in the returned error message to the application naturally varies between client implementations but is still by far better than a closed connection which says nothing of the underlying error.

 

ProtocolVersion

The ProtocolVersion is a global version incremented for each addition to the Kafka protocol, be it a new request/response type or a new request/response type version.
A broker only needs to keep track of the minimum and maximum supported versions, while clients may support a broader span of versions. It is up to the client, based on the broker's returned ProtocolVersion range, to select which request types and request versions to use when communicating with the broker. The selected protocol should only have relevance for a specific broker, meaning that for each broker it connects to it should retrieve the supported protocol versions.

Managing the ProtocolVersion

  •  The Kafka protocol specification will be updated to indicate which ProtocolVersion a certain request type and version was introduced in, as well as which ProtocolVersion it was removed (it could also state a deprecate version). E.g.:       ProduceRequest v3:
             Added in ProtocolVersion: 19
             Deprecated in ProtocolVersion: 25
             Removed in ProtocolVersion: 27
  • The Kafka protocol specification will be moved to the kafka.git repository so it is version along with the code.
  • The Kafka protocol specification of the latest Kafka release will be available on the homepage
  • No request/response definitions are ever removed from the protocol specification. When removing support for a request type/version only the "Removed in ProtocolVersion: .." information is added.
  • Patches with additions to the Kafka protocol must also include additions to the protocol specification and an increment in the global ProtocolVersion
  • The global ProtocolVersion should only be stored in one place, preferably in the broker code base.

Compatibility, Deprecation, and Migration Plan

...