Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Introduction

This document covers the protocol implemented in Kafka 0.8. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them in a client. It is aimed at making it easier to implement a client, or just as documentation for the curious. This document assumes you understand the basic design and terminology described here.

Preliminaries

Kafka uses a binary protocol over TCP. This protocol defines all apis as request response message pairs. All messages are size delimited and are made up of the following primitive types:

...

Fixed Width Primitives

...

int8, int16, int32, int64, uint8, uint16, uint32, uint64 - Integers with the given precision (in bits) stored in big endian order. unitXX variants are unsigned and have double the range.

string - A variable-length, size-delimited string. The first two bytes contain a length N stored as an int16. The succeeding N bytes contain the UTF8 encoding of the string.

Client Requests

...

Variable Length Primitives

...

bytes16, bytes32, string16 - These types consist of a signed integer giving a length N followed by N bytes of content. -1 indicates null. bytes16 and string16 use a two byte int16 size and bytes32 uses a four byte int32 size. string16 is identical in format to bytes16 but the bytes should be interpreted as UTF8 encoded characters.

...

Arrays

...

Wiki Markup
It is often useful to repeat some structure multiple times. This will always be encoded as a unit size containing the length N followed by N repetitions of the structure. In the BNF grammars below we will show an array of a structure foo as \[foo\].

Client Requests

The Kafka protocol is extremely simple. There are only four client requests APIs:.

  1. Metadata - Describes the currently available brokers, their host and port information, and gives information about which broker hosts which partitions.
  2. Send - Send messages to a broker
  3. Fetch - Fetch messages from a broker, one which fetches data, one which gets cluster metadata, and one which gets offset information about a topic.
  4. Offsets - Get offset information about the available offsets for a given topic partition.

Each of these will be described in great detail below.

...

Request Response Structure

...

There are a number of common fields shared by many or all requests. I will repeat these in each BNF, but only describe their usage here:

Code Block

RequestOrResponse => Size (Request | Response)


Request => RequestId VersionId ClientId RequestMessage
Response => VersionId ResponseMessage



Size => uint32
RequestId => uint16
VersionId => uint16
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse

The above BNF gives the grammar for all requests and responses.

Field

Purpose

Size

This is the total size of the remainder of the message. A client can always read a complete response from a socket by first reading this four byte size which will contain a non-negative integer N and then reading the N remaining bytes in the response.

RequestId

This request id indicates which API is being invoked. Each API has a numeric code given in the table below.

VersionId

This is the version of the request or response format. Each request and response is versioned independently to allow evolution and to allow clients to check compatibility.

ClientId

This is a string specified by the client that indicates

RequestMessage, ResponseMessage

The individual RequestMessage and ResponseMessage formats are given with their documentation below.

...

Notes on reading the BNF

...

The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be inclosed in parenthesis for grouping.

Message sets

One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.

A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.

Code Block

...


MessageSet => [Offset MessageSize Message]


Offset => int64
MessageSize => int32


Message => Crc MagicByte Attributes Key Value


Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes32
Value => bytes32

Field

Description

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any any value here it likes.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format.

Attributes

This byte holds metadata attributes about the message. In particular the last 3 bits contain the compression codec used for the message.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set.

Metadata API

This API answers the question "who has what data and where are they?". Specifically this request will tell for each topic how many partitions it has, which brokers currently host each of these partitions, and which of these is the master. Since Kafka is a partitioned system requests need to be directed to the appropriate server--the one currently acting as the master for the partition you want to interact with. Since cluster membership in Kafka is dynamic, you can't just give all the clients a config file with all the brokers (some of them may be down, or partitions may have moved); instead you need to ask the cluster about its current state at run time. Hence the first thing a client needs to do when it connects is ask, "where is everyone?" using this metadata API.

...

  1. Cycle through a list of bootstrap kafka urls until we find one we can connect to. Fetch cluster metadata.
  2. Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from.
  3. If we get an appropriate error, refresh the metadata and try again.
Metadata

...

Request And Response Format

The client sends a request for metadata. Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.

The metdata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.

Code Block

MetadataRequest => [TopicName]
MetadataResponse => [Metadata]

TopicName => string
Metadata => TopicName [PartitionMetadata] TopicErrorCode
PartitionMetadata => PartitionId Leader [Replicas] [Isr] PartitionErrorCode
Replica => Broker
Isr => Broker
Broker => NodeId CreatorId Host Port
NodeId => int32
CreatorId => string
Host => string
Port => int32
VersionId => int16
ClientId => string

Notes: Replicas should contain the broker information for the relevant brokers. The leader and isr should just be ids.

Produce API

The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.

The produce API uses the generic message set format, but since no offset has been assigned to the messages at the time of the send the producer is free to fill in this field in any way it likes.

Code Block

ProduceRequest =>

Interaction With The Server

Some Common Philosophical Questions

Some people have asked why we don't use HTTP. There are a number of reasons, the best is that client implementors can make use of some of the more advanced TCP features--the ability to multiplex requests, the ability to simultaneously poll many connections, etc. We have also found HTTP libraries in many languages to be surprisingly shabby.

...