You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

 

Status

Current stateUnder Discussion

Discussion threadhere

JIRA:

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP aims to solve two problems:

1. There is currently no way for a Kafka client to know which API version the broker supports. This means a client might not be able to perform its desired functionality, nor report any meaningful errors back to the application. Which leads in to problem number two:..

2. Unsupported requests are not handled gracefully by the broker. A broker receiving a protocol request it does not support will simply close the TCP connection to the client. From the client's point of view it has no way of knowing that the broker did not support the request, it could just as well be a network or software problem causing the connection reset.

This makes it hard for clients and applications to support multiple versions of Kafka, which in turn limits the Kafka eco-system since applications and clients will need to be manually built or configured for a specific broker version.

 

Additional key points

  • The BrokerMetadata may be used in broker-broker communication to decide on a common protocol version between brokers with different versions, i.e., during upgrades.
  • Exposing certain broker configuration properties (such as "message.max.bytes") automatically aligns broker and client configuration, avoiding hard to debug issues.
  • Providing an interface to expose user-configurable key-value tags to the clients opens up for future use-cases, for example location based consumer group balancing could use a user-specified "rack" tag.

 

Public Interfaces

Add protocol BrokerMetadataRequest & Response message

BrokerMetadataRequest => [NodeId]
   NodeId => int32   // Request Metadata for these brokers only.
                     // Empty array: retrieve for all brokers.
                     // Use -1 for current broker only.


BrokerMetadataResponse => [BrokerId Host Port ProtocolVersionMin ProtocolVersionMax [Key Value]]
  NodeId => int32              // Broker NodeId
  Host => string               // Broker Host
  Port => int32                // Broker Port
  ProtocolVersionMin => int32  // Broker's minimum supported protocol version
  ProtocolVersionMax => int32  // Broker's maximum supported protocol version
  Key => string                // Tag name
  Value => stirng              // Tag value
 

Proposed Changes

1. New protocol API: BrokerMetadataRequest and BrokerMetadataResponse

This KIP proposes a new protocol request and response to be added to the Kafka protocol, BrokerMetadataRequest and BrokerMetadataResponse. While adding only the ProtocolVersion field to the response would solve the issues at hand it is probably a good idea to conceive a more generic interface that allows the broker to communicate any sort of information about itself in a generic fashion to cater for future needs, such as communicating supported features, protocols, endpoints, locality, etc, to the client.

To this end the proposal suggests a string-based key-value array response message populated by the broker.

Key-Value tags

The broker implements a certain set of builtin tags that are documented in great detail (format, how to parse, expected values, etc).

Builtin tags (suggested)

KeyValue (example)Description
broker.id3Broker's NodeId
broker.version0.9.0.0-SNAPSHOT-d12ca4fBroker protocol string (human readable format)
broker.version.int0x00090000

Broker version integer for comparison use

compression.codecs1=gzip,2=snappy,3=lz4Support codecs ("AttributesBit=Name")
message.max.bytes1000000Config property
message.formatsv1,v2Supported message formats (KIP-31)
endpointsplaintext://host:9092,ssl://host:9192Available endpoints for this broker. Allows upgrading a plaintext connection to SSL

 

These are all documented, including their value format and how to parse it.
The "broker.id" has multiple purposes:
  •  allows upgrading the bootstrap broker connection to a proper one since the broker_id is initially not known, but would be with this.
  • verifying that the broker connected to is actually the broker id that was learnt through TopicMetadata.. Useful for NAT, proxy hosts, etc.   

 

Example user-provided tags

User-provided tags are added to the broker server.properties configuration file.

KeyValue
aws.zoneeu-central-1
rackr8a9
clusterkafka3

The configuration format could look something like:

   tag.aws.zone = eu-central-1
   tag.rack = r8a9
   tag.cluster = kafka3

Misc

Another benefit of this API is further client de-integration from ZooKeeper since broker and cluster metadata may now be queried directly from the broker.

While this new API has little impact to begin with it will be of great value as new protocol additions are made and it allows a thriving eco-system with generic clients and tools that can operate on multiple broker versions (which is important for tools/libs/apps packages provided by Linux distributions, or are otherwise not versioned with the official Kafka project, i.e., the non-java clients).

 

2. Improved handling of unsupported requests on broker

Additionally, to solve issue nr 2. - ungraceful handling of unknown requests - it is also suggested to make the following change in the broker request handling code:

If a request is received for an unknown protocol request type, or for an unsupported version of a known type, the broker should respond with an empty response only including the common Length+CorrId header with Length=0, instead of closing the connection. The effect of this empty message received by the client is that the client will fail to parse the response (since it will expect a proper non-empty reply) and throw an error to the application with the correct context - that of a protocol parsing failure for request type XYZ. All existing clients should be able to handle this gracefully without any alterations to the code, while updated clients supporting the proposals in this KIP can handle the empty response accordingly by returning a "Request not supported by server" error the application. The level of detail in the returned error message to the application naturally varies between client implementations but is still by far better than a closed connection which says nothing of the underlying error.

 

ProtocolVersion

The ProtocolVersion is a global version incremented for each addition to the Kafka protocol, be it a new request/response type or a new request/response type version.
A broker only needs to keep track of the minimum and maximum supported versions, while clients may support a broader span of versions. It is up to the client, based on the broker's returned ProtocolVersion range, to select which request types and request versions to use when communicating with the broker. The selected protocol should only have relevance for a specific broker, meaning that for each broker it connects to it should retrieve the supported protocol versions.

Managing the ProtocolVersion

  •  The Kafka protocol specification will be updated to indicate which ProtocolVersion a certain request type and version was introduced in, as well as which ProtocolVersion it was removed (it could also state a deprecate version). E.g.:       ProduceRequest v3:
             Added in ProtocolVersion: 19
             Deprecated in ProtocolVersion: 25
             Removed in ProtocolVersion: 27
  • The Kafka protocol specification will be moved to the kafka.git repository so it is version along with the code.
  • The Kafka protocol specification of the latest Kafka release will be available on the homepage
  • No request/response definitions are ever removed from the protocol specification. When removing support for a request type/version only the "Removed in ProtocolVersion: .." information is added.
  • Patches with additions to the Kafka protocol must also include additions to the protocol specification and an increment in the global ProtocolVersion
  • The global ProtocolVersion should only be stored in one place, preferably in the broker code base.

Compatibility, Deprecation, and Migration Plan

  • Existing clients are not affected by 1. (BrokerMetadataRequest), but will get the error propagation benefits of 2. (graceful errors).
  • Updated clients that add support for 1. will be able to adapt their functionality based on broker functionality and provide meaningful error propagation to applications when proper broker support is lacking.
  • Release with 0.9.0.0: Since 0.9.0.0 will incorporate a large number of new protocol request types that the various client implementations are likely to pick up and implement it makes sense to add this new functionality along with it.

Rejected Alternatives

 

  • No labels