Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This document covers the protocol implemented in Kafka 0.8 and beyond. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them to implement a client. This document assumes you understand the basic design and terminology described here.

The protocol used in 0.7 and earlier is similar to this, but we chose to make a one time (we hope) break in compatibility to be able to clean up cruft and generalize things.

Overview

The Kafka protocol is fairly simple, there are only four client requests APIs.

...

Each of these will be described in detail below.

Preliminaries

Network

Kafka uses a binary protocol over TCP. The protocol defines all apis as request response message pairs. All messages are size delimited and are made up of the following primitive types.

The client initiates a socket connection and then writes a sequence of request messages and reads back the corresponding response message. No handshake is required on connection or disconnection. TCP is happier if you maintain persistant connections used for many requests to amortize the cost of the TCP handshake, but beyond this penalty connecting is pretty cheap.

The client will likely need to maintain a connection to multiple brokers, as data is partitioned and the clients will need to talk to the server that has their data. However it should not generally The client will likely need to maintain a connection to multiple brokers, however it should not be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).

The assignment of messages to partitions is a client concern. When it receives a produce or fetch request the broker will check that it is the leader for the partitions in the request. However the assignment of messages to partitions is purely in the control of the client. Most clients will want to partition by the message key, but the exact nature of the partition mapping is not specified. Generally it will be some kind of simple hash over the partitions. If two clients want to use the same partitioning scheme they must use the same partition assignment algorithm. In the scala client we have made it pluggable via a partitioner interface that controls the mapping of key to partition.

As indicated above the client must address most requests to the broker responsible for the appropriate data. The only exception is the metadata request which can be issued against any server. The result of this request gives the partition ownership information that can be used to direct subsequent requests. If the client gets an error or an exception (e.g., socket exception), it should refresh its metadata.

The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.

Protocol Primitives

The protocol is built out of the following primitive structures.

Fixed Width Primitives

int8, int16, int32, int64 - Signed integers with the given precision (in bits) stored in big endian order.

Variable Length Primitives

bytes, string - These types consist of a signed integer giving a length N followed by N bytes of content. A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.

Arrays

Wiki Markup
This is a notation for handling repeated structures. These will always be encoded as an int32 size containing the length N followed by N repetitions of the structure which can itself be made up of other primitive types. In the BNF grammars below we will show an array of a structure foo as \[foo\].

Notes on reading the request format grammars

The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be enclosed in parenthesis for grouping. The top-level definition is always given first and subsequent sub-parts are indented.

Common Request and Response Structure

All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:

Code Block

RequestOrResponse => MessageSize (RequestMessage | ResponseMessage)
  MessageSize => int32

Field

Description

MessageSize

The MessageSize field gives the size of the subsequent request or response message in bytes. The client can read requests by first reading this 4 byte size as an integer N, and then reading and parsing the subsequent N bytes of the request.

A request looks like this:

Code Block

RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest

Field

Description

ApiKey

This is a numeric id for the API being invoked (i.e. is it a metadata request, a produce request, a fetch request, etc).

ApiVersion

This is a numeric version number for this api. We version each API and this version number allows the server to properly interpret the request as the protocol evolves.

CorrelationId

This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server.

ClientId

This is a user supplied identifier for the client application. The user can use any identifier they like and it will be used when logging errors, monitoring aggregates, etc.

The various request and response messages will be described below.

And the response:

Code Block

Response => ResponseVersion CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse

Field

Description

CorrelationId

The server passes back whatever integer the client supplied as the correlation in the request.

...

Message sets

...

One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.

A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.

Code Block

MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32

The individual messages in the set are defined as follows:

Code Block

Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Key => bytes
  Value => bytes

Field

Description

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format.

Attributes

This byte holds metadata attributes about the message. In particular the last 3 bits contain the compression codec used for the message.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set.

Metadata API

This API answers the question "who has what data and where are they?". Specifically this request will tell for each topic how many partitions it has, which brokers currently host each of these partitions, and which of these is the master. Since Kafka is a partitioned system requests need to be directed to the appropriate server--the one currently acting as the master for the partition you want to interact with. Since cluster membership in Kafka is dynamic, you can't just give all the clients a config file with all the brokers (some of them may be down, or partitions may have moved); instead you need to ask the cluster about its current state at run time. Hence the first thing a client needs to do when it connects is ask, "where is everyone?" using this metadata API.

This is the only request that can be made to any server without regard to partition ownership and all servers will give the same answer (disregarding timing differences). Fetch and produce requests always interact with particular partitions, and sending these to the wrong broker will result in an invalid metadata error. The client is expected to cache the cluster metadata locally, using it to direct requests to the correct hosts, until it gets an invalid metadata error or can't reach a particular broker, at which point it should fetch the metadata again and update its cache.

This presents a bit of a catch-22, since the only way to find out which Kafka servers exists is to ask a Kafka server, so how can a client ever connect the first time? To do this a client should take a "bootstrap urls" configuration from which it can find out the list of currently available servers. Importantly this need not contain all the servers in the cluster, maybe just two or three for redundancy. The client should try each of these until it finds one it can connect to. This will ensure that even if one of the bootstrap servers is down the client can still fetch the cluster metadata.

For deployment you may not want to hardcode such a list and may prefer to rely on dns or a VIP or something like that to find a bootstap server.

So the lifecycle of most clients looks something like this:

...

server guarantees that on a single TCP connection, requests will be processed in the order they are sent and responses will return in that order as well.

The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.

Partitioning and bootstrapping

Kafka is a partitioned system so not all servers have the complete data set. Instead recall that topics are split into a pre-defined number of partitions, P, and each partition is replicated with some replication factor, N. Topic partitions themselves are just ordered "commit logs" numbered 0, 1, ..., P.

All systems of this nature have the question of how a particular piece of data is assigned to a particular partition. Kafka clients directly control this assignment, the brokers themselves enforce no particular semantics of which messages should be published to a particular partition. Rather, to publish messages the client directly addresses messages to a particular partition, and when fetching messages, fetches from a particular partition. If two clients want to use the same partitioning scheme they must use the same method to compute the mapping of key to partition.

These requests to publish or fetch data must be sent to the broker that is currently acting as the leader for a given partition. This condition is enforced by the broker, so a request for a particular partition to the wrong broker will result in an the NotLeaderForPartition error code (described below). 

How can the client find out which topics exist, what partitions they have, and which brokers currently host those partitions so that it can direct its requests to the right hosts? This information is dynamic, so you can't just configure each client with some static mapping file. Instead all Kafka brokers can answer a metadata request that describes the current state of the cluster: what topics there are, which partitions those topics have, which broker is the leader for those partitions, and the host and port information for these brokers.

In other words, the client needs to somehow find one broker and that broker will tell the client about all the other brokers that exist and what partitions they host. This first broker may itself go down so the best practice for a client implementation is to take a list of two or three urls to bootstrap from. The user can then choose to use a load balancer or just statically configure two or three of their kafka hosts in the clients.

The client does not need to keep polling to see if the cluster has changed; it can fetch metadata once when it is instantiated cache that metadata until it receives an error indicating that the metadata is out of date. This error can come in two forms: (1) a socket error indicating the client cannot communicate with a particular broker, (2) an error code in the response to a request indicating that this broker no longer hosts the partition for which data was requested.

  1. Cycle through a list of "bootstrap" kafka urls until we find one we can connect to. Fetch cluster metadata.
  2. Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from.
  3. If we get an appropriate error, refresh the metadata and try again.
Partitioning Strategies

As mentioned above the assignment of messages to partitions is something the producing client controls. That said, how should this functionality be exposed to the end-user?

Partitioning really serves two purposes in Kafka:

  1. It balances data and request load over brokers
  2. It serves as a way to divy up processing among consumer processes while allowing local state and preserving order within the partition. We call this semantic partitioning.

For a given use case you may care about only one of these or both.

To accomplish simple load balancing a simple approach would be for the client to just round robin requests over all brokers. Another alternative, in an environment where there are many more producers than brokers, would be to have each client chose a single partition at random and publish to that. This later strategy will result in far fewer TCP connections.

Semantic partitioning means using some key in the message to assign messages to partitions. For example if you were processing a click message stream you might want to partition the stream by the user id so that all data for a particular user would go to a single consumer. To accomplish this the client can take a key associated with the message and use some hash of this key to choose the partition to which to deliver the message.

Batching

Our apis encourage batching small things together for efficiency. We have found this is a very significant performance win. Both our API to send messages and our API to fetch messages always work with a sequence of messages not a single message to encourage this. A clever client can make use of this and support an "asynchronous" mode in which it batches together messages sent individually and sends them in larger clumps. We go even further with this and allow the batching across multiple topics and partitions, so a produce request may contain data to append to many partitions and a fetch request may pull data from many partitions all at once.

The client implementor can choose to ignore this and send everything one at a time if they like.

Versioning and Compatibility

The protocol is designed to enable incremental evolution in a backward compatible fashion. Our versioning is on a per-api basis, each version consisting of a request and response pair. Each request contains an API key that identifies the API being invoked and a version number that indicates the format of the request and the expected format of the response.

The intention is that clients would implement a particular version of the protocol, and indicate this version in their requests. Our goal is primarily to allow API evolution in an environment where downtime is not allowed and clients and servers cannot all be changed at once.

The server will reject requests with a version it does not support, and will always respond to the client with exactly the protocol format it expects based on the version it included in its request. The intended upgrade path is that new features would first be rolled out on the server (with the older clients not making use of them) and then as newer clients are deployed these new features would gradually be taken advantage of.

Currently all versions are baselined at 0, as we evolve these APIs we will indicate the format for each version individually.

Protocol Primitive Types

The protocol is built out of the following primitive types.

Fixed Width Primitives

int8, int16, int32, int64 - Signed integers with the given precision (in bits) stored in big endian order.

Variable Length Primitives

bytes, string - These types consist of a signed integer giving a length N followed by N bytes of content. A length of -1 indicates null. string uses an int16 for its size, and bytes uses an int32.

Arrays

Wiki Markup
This is a notation for handling repeated structures. These will always be encoded as an int32 size containing the length N followed by N repetitions of the structure which can itself be made up of other primitive types. In the BNF grammars below we will show an array of a structure foo as \[foo\].

Notes on reading the request format grammars

The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be enclosed in parenthesis for grouping. The top-level definition is always given first and subsequent sub-parts are indented.

Common Request and Response Structure

All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:

Code Block

RequestOrResponse => MessageSize (RequestMessage | ResponseMessage)
  MessageSize => int32

Field

Description

MessageSize

The MessageSize field gives the size of the subsequent request or response message in bytes. The client can read requests by first reading this 4 byte size as an integer N, and then reading and parsing the subsequent N bytes of the request.

A request looks like this:

Code Block

RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
  ApiKey => int16
  ApiVersion => int16
  CorrelationId => int32
  ClientId => string
  RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest

Field

Description

ApiKey

This is a numeric id for the API being invoked (i.e. is it a metadata request, a produce request, a fetch request, etc).

ApiVersion

This is a numeric version number for this api. We version each API and this version number allows the server to properly interpret the request as the protocol evolves.

CorrelationId

This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server.

ClientId

This is a user supplied identifier for the client application. The user can use any identifier they like and it will be used when logging errors, monitoring aggregates, etc.

The various request and response messages will be described below.

And the response:

Code Block

Response => ResponseVersion CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse

Field

Description

CorrelationId

The server passes back whatever integer the client supplied as the correlation in the request.

...

Message sets

...

One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.

A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.

Code Block

MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32

Message format

Code Block

Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  Key => bytes
  Value => bytes

Field

Description

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format.

Attributes

This byte holds metadata attributes about the message. In particular the last 3 bits contain the compression codec used for the message.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

...

Compression

...

Kafka supports batch compression of a message set. Batch compression is important as individual messages may not have sufficient redundancy to enable good compression rations. Batch compression is supported by allowing the value field of a message to hold a compressed message set in the format defined above with the attributes field of the message set to indicate the compression codec used.

Kafka currently supports two compression codecs for message sets with the following codec numbers:

Compression

Codec

None

0

GZIP

1

Snappy

2

The APIs

This section gives details on each of the individual APIs, their usage, their binary format, and the meaning of their fields.

Metadata API

This API answers the following questions:

  • What topics exist?
  • How many partitions does each topic have?
  • Which broker is currently the leader for each partition?
  • What is the host and port for each of these brokers

This is the only request that can be addressed to any broker in the cluster

...

.

Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.

...

Code Block
MetadataResponse => [TopicMetadata][Brokers]
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  PartitionMetadata => PartitionErrorCode PartitionId LeaderExists Leader Replicas[Replica] [Isr]
  PartitionErrorCode => int16
  PartitionId => int32
  LeaderExists => int8
  Leader => Brokerint32
  Replicas => [Brokerint32]
  Isr => [Brokerint32]
  Broker => NodeId CreatorId Host Port
  NodeId => int32
  CreatorId => string
  Host => string
  Port => int32

...

Error

Code

Description

NoError

0

No error--it worked!

Unknown

-1

An unexpected server error

OffsetOutOfRangeCode OffsetOutOfRange

1

The requested offset is outside the range of offsets maintained by the server for the given topic/partition.

InvalidMessageCode InvalidMessage

2

This indicates that a message contents does not match its CRC

UnknownTopicOrPartitionCode UnknownTopicOrPartition

3

This request is for a topic or partition that does not exist on this broker.

InvalidMessageSizeCode InvalidMessageSize

4

The message has a negative size

LeaderNotAvailableCode LeaderNotAvailable

5

This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.

NotLeaderForPartitionCode NotLeaderForPartition

6

This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.

RequestTimedOutCode RequestTimedOut

7

This error is thrown if the request exceeds the user-specified time limit in the request.

BrokerNotAvailableCode BrokerNotAvailable

8

This is not a client facing error and is used only internally by intra-cluster broker communication.

ReplicaNotAvailableCode ReplicaNotAvailable

9

What is the difference between this and LeaderNotAvailable?

MessageSizeTooLargeCode MessageSizeTooLarge

10

The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.

...