Introduction
This document covers the protocol implemented in Kafka 0.8. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them to implement a client. This document assumes you understand the basic design and terminology described here.
Overview
The Kafka protocol is fairly simple, there are only four client requests APIs.
- Metadata - Describes the currently available brokers, their host and port information, and gives information about which broker hosts which partitions.
- Send - Send messages to a broker
- Fetch - Fetch messages from a broker, one which fetches data, one which gets cluster metadata, and one which gets offset information about a topic.
- Offsets - Get information about the available offsets for a given topic partition.
Each of these will be described in detail below.
Preliminaries
Kafka uses a binary protocol over TCP. The protocol defines all apis as request response message pairs. All messages are size delimited and are made up of the following primitive types.
The client initiates a socket connection and then writes a sequence of request messages and reads back the corresponding response message. No handshake is required on connection or disconnection.
The client will likely need to maintain a connection to multiple brokers, however it should not be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).
The assignment of messages to partitions is a client concern. When it receives a produce or fetch request the broker will check that it is the leader for the partitions in the request. However the assignment of messages to partitions is purely in the control of the client. Most clients will want to partition by the message key, but the exact nature of the partition mapping is not specified. Generally it will be some kind of simple hash over the partitions. If two clients want to use the same partitioning scheme they must use the same partition assignment algorithm. In the scala client we have made it pluggable via a partitioner interface that controls the mapping of key to partition.
As indicated above the client must address most requests to the broker responsible for the appropriate data. The only exception is the metadata request which can be issued against any server. The result of this request gives the partition ownership information that can be used to direct subsequent requests. If the client gets an error it should refresh its metadata.
The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.
Protocol Primitives
The protocol is built out of the following primitive structures.
Fixed Width Primitives
int8, int16, int32, int64, uint8, uint16, uint32, uint64 - Integers with the given precision (in bits) stored in big endian order. unitXX variants are unsigned and have double the range.
Variable Length Primitives
bytes16, bytes32, string16 - These types consist of a signed integer giving a length N followed by N bytes of content. -1 indicates null. bytes16 and string16 use a two byte int16 size and bytes32 uses a four byte int32 size. string16 is identical in format to bytes16 but the bytes should be interpreted as UTF8 encoded characters.
Arrays
This is a notation for handling repeated structures. These will always be encoded as an uint32 size containing the length N followed by N repetitions of the structure which can itself be made up of other primitive types. In the BNF grammars below we will show an array of a structure foo as [foo].
Notes on reading the request format grammars
The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be inclosed in parenthesis for grouping. The top-level definition is always given first and subsequent sub-parts are indented.
Common Request and Response Structure
All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:
RequestOrResponse => MessageSize (RequestMessage | ResponseMessage) MessageSize => uint32
Field |
Description |
---|---|
MessageSize |
The MessageSize field gives the size of the subsequent request or response message in bytes. The client can read requests by first reading this 4 byte size as an integer N, and then reading and parsing the subsequent N bytes of the request. |
A request looks like this:
RequestMessage => ApiKey ApiVersion ClientId RequestMessage ApiKey => uint16 ApiVersion => uint16 ClientId => string RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest
Field |
Description |
---|---|
ApiKey |
This is a numeric id for the API being invoked (i.e. is it a metadata request, a produce request, a fetch request, etc). |
ApiVersion |
This is a numeric version number for this api. We version each API and this version number allows the server to properly interpret the request as the protocol evolves. |
ClientId |
This is a user supplied identifier for the client application. The user can use any identifier they like and it will be used when logging errors, monitoring aggregates, etc. |
The various request and response messages will be described below.
And the response:
Response => VersionId ResponseMessage ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse
Message sets
One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.
A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.
MessageSet => [Offset MessageSize Message] Offset => int64 MessageSize => int32
The individual messages in the set are defined as follows:
Message => Crc MagicByte Attributes Key Value Crc => int32 MagicByte => int8 Attributes => int8 Key => bytes32 Value => bytes32
Field |
Description |
---|---|
Offset |
This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any any value here it likes. |
Crc |
The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. |
MagicByte |
This is a version id used to allow backwards compatible evolution of the message binary format. |
Attributes |
This byte holds metadata attributes about the message. In particular the last 3 bits contain the compression codec used for the message. |
Key |
The key is an optional message key that was used for partition assignment. The key can be null. |
Value |
The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. |
Metadata API
This API answers the question "who has what data and where are they?". Specifically this request will tell for each topic how many partitions it has, which brokers currently host each of these partitions, and which of these is the master. Since Kafka is a partitioned system requests need to be directed to the appropriate server--the one currently acting as the master for the partition you want to interact with. Since cluster membership in Kafka is dynamic, you can't just give all the clients a config file with all the brokers (some of them may be down, or partitions may have moved); instead you need to ask the cluster about its current state at run time. Hence the first thing a client needs to do when it connects is ask, "where is everyone?" using this metadata API.
This is the only request that can be made to any server without regard to partition ownership and all servers will give the same answer (disregarding timing differences). Fetch and produce requests always interact with particular partitions, and sending these to the wrong broker will result in an invalid metadata error. The client is expected to cache the cluster metadata locally, using it to direct requests to the correct hosts, until it gets an invalid metadata error or can't reach a particular broker, at which point it should fetch the metadata again and update its cache.
This presents a bit of a catch-22, since the only way to find out which Kafka servers exists is to ask a Kafka server, so how can a client ever connect the first time? To do this a client should take a "bootstrap urls" configuration from which it can find out the list of currently available servers. Importantly this need not contain all the servers in the cluster, maybe just two or three for redundancy. The client should try each of these until it finds one it can connect to. This will ensure that even if one of the bootstrap servers is down the client can still fetch the cluster metadata.
For deployment you may not want to hardcode such a list and may prefer to rely on dns or a VIP or something like that to find a bootstap server.
So the lifecycle of most clients looks something like this:
- Cycle through a list of bootstrap kafka urls until we find one we can connect to. Fetch cluster metadata.
- Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from.
- If we get an appropriate error, refresh the metadata and try again.
Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.
The metdata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.
Metadata Request
MetadataRequest => [TopicName] TopicName => string
Metadata Response
MetadataResponse => [TopicMetadata] TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] PartitionMetadata => PartitionErrorCode PartitionId LeaderExists Leader Replicas Isr PartitionErrorCode => int16 PartitionId => unit32 LeaderExists => int8 Leader => Broker Replicas => [Broker] Isr => [Broker] Broker => NodeId CreatorId Host Port NodeId => uint32 CreatorId => string Host => string Port => uint32
Produce API
The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.
The produce API uses the generic message set format, but since no offset has been assigned to the messages at the time of the send the producer is free to fill in that field in any way it likes.
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]] RequiredAcks => int16 Timeout => uint32 Partition => uint32 MessageSetSize => uint32
Field |
Description |
---|---|
RequiredAcks |
This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server responds immediately prior to even writing the data to disk. If it is 1 the data is written to the local machine only with no blocking on replicas. If it is -1 the server will block until the message is committed by all in sync replicas. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas). |
Timeout |
This provides a maximum time the server can await receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout. |
TopicName |
The topic that data is being published to. |
Partition |
The partition that data is being published to. |
MessageSetSize |
The size, in bytes, of the message set that follows. |
MessageSet |
A set of messages in the standard format described above. |
ProduceResponse => [TopicName [Partition ErrorCode Offset]] TopicName => string Partition => unit32 ErrorCode => int16 Offset => int64
Field |
Description |
---|---|
Topic |
The topic this response entry corresponds to. |
Partition |
The partition this response entry corresponds to. |
ErrorCode |
The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be unavailable or maintained on a different host, while others may have successfully accepted the produce request. |
Offset |
The offset assigned to the first message in the message set appended to this partition. |
Fetch API
The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages.
Fetch requests follow a long poll model so they can be made to block for a period of time if sufficient data is not immediately available.
One thing to note is that the fetch API requires specifying the partition to consume from. The question is how should a consumer know what partitions to consume from? In particular how can you balance the partitions over a set of consumers acting as a group so that each consumer gets a subset of partitions. We have done this assignment dynamically using zookeeper for the scala and java client. The downside of this approach is that it requires a fairly fat client and a zookeeper connection. We haven't yet created a Kafka API to allow this functionality to be moved to the server side and accessed more conveniently. A simple consumer client can be implemented by simply requiring that the partitions be specified in config, though this will not allow dynamic reassignment of partitions should that consumer fail. We hope to address this gap in the next major release.
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]] ReplicaId => int32 MaxWaitTime => uint32 MinBytes => uint32 TopicName => string Partition => uint32 FetchOffset => int64 MaxBytes => uint32
Field |
Description |
---|---|
ReplicaId |
The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. |
MaxWaitTime |
The max wait time is the maximum amount of time to block waiting if insufficient data is available at the time the request is issued. |
MinBytes |
This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). |
TopicName |
The name of the topic. |
Partition |
The id of the partition the fetch is for. |
FetchOffset |
The offset to begin this fetch from. |
MaxBytes |
The maximum bytes to include in the message set for this partition. This helps bound the size of the response. |
Fetch Response
FetchResponse => [TopicName [Partition ErrorCode FetchedOffset HighwaterMarkOffset MessageSetSize MessageSet]] TopicName => string Partition => unit32 ErrorCode => int16 FetchedOffset => uint64 HighwaterMarkOffset => int64 MessageSetSize => int32
Field |
Description |
---|---|
TopicName |
The name of the topic this response entry is for. |
Partition |
The id of the partition this response is for. |
FetchedOffset |
The offset from which the fetch began. |
HighwaterMarkOffset |
The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are. |
MessageSetSize |
The size in bytes of the message set for this partition |
MessageSet |
The message data fetched from this partition, in the format described above. |
Offset API
This API describes the valid offset rage available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.
The response contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.
We agree that this API is slightly funky.
OffsetRequest => [TopicName [Partition Time MaxNumberOfOffsets]] TopicName => string Partition => uint32 Time => uint64 MaxNumberOfOffsets => int32
OffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64
Error Codes
We use numeric codes to indicate what problem occurred on the server. These can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Here is a table of the error codes currently in use:
Error |
Code |
Description |
---|---|---|
NoError |
0 |
No error--it worked! |
Unknown |
-1 |
An unexpected server error |
OffsetOutOfRangeCode |
1 |
The requested offset is outside the range of offsets maintained by the server for the given topic/partition. |
InvalidMessageCode |
2 |
This indicates that a message contents does not match its CRC |
UnknownTopicOrPartitionCode |
3 |
This request is for a topic or partition that does not exist on this broker. |
InvalidMessageSizeCode |
4 |
The message has a negative size |
LeaderNotAvailableCode |
5 |
This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes. |
NotLeaderForPartitionCode |
6 |
This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date. |
RequestTimedOutCode |
7 |
This error is thrown if the request exceeds the user-specified time limit in the request. |
BrokerNotAvailableCode |
8 |
This is not a client facing error and is used only internally by intra-cluster broker communication. |
ReplicaNotAvailableCode |
9 |
What is the difference between this and LeaderNotAvailable |
MessageSizeTooLargeCode |
10 |
The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum. |
Some Common Philosophical Questions
Some people have asked why we don't use HTTP. There are a number of reasons, the best is that client implementors can make use of some of the more advanced TCP features--the ability to multiplex requests, the ability to simultaneously poll many connections, etc. We have also found HTTP libraries in many languages to be surprisingly shabby.
Others have asked if maybe we shouldn't support many different protocols. Prior experience with this was that it makes it very hard to add and test new features if they have to be ported across many protocol implementations. Our feeling is that most users don't really see multiple protocols as a feature, they just want a good reliable client in the language of their choice.
Another question is why we don't adopt XMPP, STOMP, AMQP or an existing protocol. The answer to this varies by protocol, but in general the problem is that the protocol does determine large parts of the implementation and we couldn't do what we are doing if we didn't have control over the protocol. Our belief is that it is possible to do better than existing messaging systems have in providing a truly distributed messaging system, and to do this we need to build something that works different.
A final question is why we don't use a system like Protocol Buffers or Thrift to define our messages. These packages excel at helping you to managing lots and lots of serialized messages. However we have only a few messages. Support across languages is somewhat spotty (depending on the package). Finally the mapping between binary log format and wire protocol is something we manage somewhat carefully and this would not be possible with these systems. Finally we prefer the style of versioning APIs explicitly and checking this to inferring new values as nulls as it allows more nuanced control of compatibilit