...
This document covers the protocol implemented in Kafka 0.8. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them in a client. It is aimed at making it easier to implement a client. This document assumes you understand the basic design and terminology described here.
...
Kafka uses a binary protocol over TCP. This protocol defines all apis as request response message pairs. All messages are size delimited and are made up of the following primitive types:
Fixed Width Primitives
int8, int16, int32, int64, uint8, uint16, uint32, uint64 - Integers with the given precision (in bits) stored in big endian order. unitXX variants are unsigned and have double the range.
...
Wiki Markup |
---|
It is often useful to repeat some structure multiple times. This will always be encoded as a unitint size containing the length N followed by N repetitions of the structure which can itself be made up of other primitive types. In the BNF grammars below we will show an array of a structure foo as \[foo\]. |
Client Requests
The Kafka protocol is extremely fairly simple. There , there are only four client requests APIs.
...
Each of these will be described in great detail below.
...
Request Response Structure
...
There are a number of common fields shared by many or all requests. I will repeat these in each BNF, but only describe their usage here:
Code Block |
---|
RequestOrResponse => Size (Request | Response)
Request => RequestId VersionId ClientId RequestMessage
Response => VersionId ResponseMessage
Size => uint32
RequestId => uint16
VersionId => uint16
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse
|
The above BNF gives the grammar for all requests and responses.
Field | Purpose |
---|---|
Size | This is the total size of the remainder of the message. A client can always read a complete response from a socket by first reading this four byte size which will contain a non-negative integer N and then reading the N remaining bytes in the response. |
RequestId | This request id indicates which API is being invoked. Each API has a numeric code given in the table below. |
VersionId | This is the version of the request or response format. Each request and response is versioned independently to allow evolution and to allow clients to check compatibility. |
ClientId | This is a string specified by the client that indicates |
RequestMessage, ResponseMessage | The individual RequestMessage and ResponseMessage formats are given with their documentation below. |
...
Notes on reading the BNF
...
The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be inclosed in parenthesis for grouping.
Message sets
One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.
A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.
Code Block |
---|
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes32
Value => bytes32
|
...
Notes on reading the request format grammars
...
The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be inclosed in parenthesis for grouping. The top-level definition is always given first and subsequent sub-parts are indented.
...
Common Request and Response Structure
...
All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:
Code Block |
---|
RequestOrResponse => MessageSize (RequestMessage | ResponseMessage)
MessageSize => uint32
|
Field | Description |
---|---|
MessageSize | The MessageSize field gives the size of the subsequent request or response message in bytes. The client can read requests by first reading this 4 byte size as an integer N, and then reading and parsing the subsequent N bytes of the request. |
A request looks like this:
Code Block |
---|
RequestMessage => ApiKey ApiVersion ClientId RequestMessage
ApiKey => uint16
ApiVersion => uint16
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest
|
Field | Description |
---|---|
ApiKey | This is a numeric id for the API being invoked (i.e. is it a metadata request, a produce request, a fetch request, etc). |
ApiVersion | This is a numeric version number for this api. We version each API and this version number allows the server to properly interpret the request as the protocol evolves. |
ClientId | This is a user supplied identifier for the client application. The user can use any identifier they like and it will be used when logging errors, monitoring aggregates, etc. |
The various request and response messages will be described below.
And the response:
Code Block |
---|
Response => VersionId ResponseMessage
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse
|
...
Message sets
...
One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.
A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.
Code Block |
---|
MessageSet => [Offset MessageSize Message] Offset => int64
MessageSize => int32
|
The individual messages in the set are defined as follows:
Code Block |
---|
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes32
Value => bytes32
|
Field | Description |
---|---|
Offset | This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any any value here it likes. |
Crc | The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. |
MagicByte | This is a version id used to allow backwards compatible evolution of the message binary format. |
Attributes | This byte holds metadata attributes about the message. In particular the last 3 bits contain the compression codec used for the message. |
Key | The key is an optional message key that was used for partition assignment. The key can be null. |
Value | The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. |
...
- Cycle through a list of bootstrap kafka urls until we find one we can connect to. Fetch cluster metadata.
- Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from.
- If we get an appropriate error, refresh the metadata and try again.
Metadata
...
API Format
The client sends a request for metadata. Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.
The metdata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.
...
Metadata Request
...
Code Block |
---|
MetadataRequest => [TopicName] MetadataResponse TopicName => string |
...
Metadata Response
...
Code Block |
---|
MetadataResponse[Metadata] TopicName => string Metadata[TopicMetadata] TopicMetadata => TopicName [PartitionMetadata] TopicErrorCode PartitionMetadata => PartitionId Leader [Replicas] [Isr] PartitionErrorCode Replica => Broker Isr => Broker Broker => NodeId CreatorId Host Port NodeId => int32 CreatorId => string Host => string Port => int32 VersionId => int16 ClientId => string |
Notes: Replicas should contain the broker information for the relevant brokers. The leader and isr should just be ids.
Produce API
The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.
The produce API uses the generic message set format, but since no offset has been assigned to the messages at the time of the send the producer is free to fill in this that field in any way it likes.
Code Block |
---|
ProduceRequest => RequiredAcks Timeout TopicData ProduceResponse => [TopicProduceResponse] TopicResponse RequiredAcks => TopicName [PartitionProduceResponse] PartitionProduceResponse => Partition ErrorCode Offset RequiredAcks => int16 int16 Timeout => int32 TopicData => TopicName [PartitionData] TopicName => string PartitionData => Partition MessageSetSize MessageSet Partition => int32 MessageSetSizePartition => int32 ErrorCode => int16 OffsetMessageSetSize => int64int32 |
Field | Description |
---|---|
RequiredAcks | This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server responds immediately prior to even writing the data to disk. If it is 1 the data is written to the local machine only with no blocking on replicas. If it is -1 the server will block until the message is committed by all in sync replicas. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas). |
Timeout | This provides a maximum time the server can await receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout. |
Topic TopicName | The topic that data is being published to. |
Partition | The partition that data is being published to. |
MessageSetSize | The number of bytes in size, in bytes, of the message set that follows. |
MessageSet | This is the message set format described in a previous section. |
ErrorCode | The error from this partition, if any |
Offset | The offset assigned to the first message in the message set for this partition. |
Fetch API
A set of messages in the standard format described above. |
Code Block |
---|
ProduceResponse => [TopicProduceResponse]
TopicResponse => TopicName [PartitionProduceResponse]
PartitionProduceResponse => Partition ErrorCode Offset
TopicName => string
ErrorCode => int16
Offset => int64
|
Field | Description |
---|---|
Topic | The topic this response entry corresponds to. |
Partition | The partition this response entry corresponds to. |
ErrorCode | The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be unavailable or maintained on a different host, while others may have successfully accepted the produce request. |
Offset | The offset assigned to the first message in the message set appended to this partition. |
Fetch API
The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages.
...
Code Block |
---|
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicFetchRequest] ReplicaId => int32 MaxWaitTime => int32 MinBytes => int32 TopicFetchRequest => TopicName [PartitionFetchRequest] TopicName => string PartitionFetchResponse => Partition FetchOffset MaxBytes FetchOffset => int64 FetchResponse => [TopicData] TopicData => Partition [PartitionData] PartitionData => ErrorCode FetchedOffset HighwaterMarkOffset MessageSetSize MessageSet ErrorCode => int16 FetchedOffset => uint64 HighwaterMarkOffset => int64 MessageSetSize => int32 |
Field | Description |
---|---|
ReplicaId | The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. |
MaxWaitTime | The max wait time is the maximum amount of time to block waiting if insufficient data is available at the time the request is issued. |
MinBytes | This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). |
TopicName | The name of the topic. |
Partition | The id of the partition the fetch is for. |
FetchOffset | The offset to begin fetching from. |
MinBytes | This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). |
TopicName | The name of the topic. |
Partition | The id of the partition the fetch is for. |
FetchOffset | The offset to begin this fetch from. |
MaxBytes | The maximum bytes to include in the message set for this partition. This helps bound the size of the response. |
Fetch Response
Code Block |
---|
FetchResponse => [TopicData]
TopicData => TopicName [PartitionData]
TopicName => string
PartitionData => ErrorCode FetchedOffset HighwaterMarkOffset MessageSetSize MessageSet
ErrorCode => int16
FetchedOffset => uint64
HighwaterMarkOffset => int64
MessageSetSize => int32
|
Field | Description | ||
---|---|---|---|
TopicName | The name of the topic this response entry is for. | ||
Partition | The id of the partition this response is for. | ||
FetchedOffset | The offset from which the fetch began. | ||
HighwaterMarkOffset | The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are. | ||
MessageSetSize | The size in bytes of the message set for this partition | ||
MessageSet | The message data fetched from this partition, in the format described above | MaxBytes | The maximum bytes to include in the response. |
Offset API
This API describes the valid offsets offset rage available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.
...
Code Block |
---|
OffsetRequest => [TopicName [PartitionOffsetRequest]] TopicName => string PartitionOffsetRequest => Partition Time MaxNumberOfOffsets Partition => int32 Time => int64 MaxNumberOfOffsets => int32 |
Code Block |
---|
OffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode [Offset]
Partition => int32
ErrorCode => int16
Offset => int64
|
...
Field
...
Description
...
...
...
...
...
...
Error Codes
We use numeric codes to indicate what problem occurred on the server. These can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Here is a table of the error codes currently in use:
...