Table of Contents |
---|
Introduction
This document covers the protocol implemented in Kafka 0.8 and beyond. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them to implement a client. This document assumes you understand the basic design and terminology described here.
...
All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:
Code Block |
---|
RequestOrResponse => Size (RequestMessage | ResponseMessage)
Size => int32
|
...
Requests all have the following format:
Code Block |
---|
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
|
...
The various request and response messages will be described below.
Responses
Code Block |
---|
Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
|
...
N.B., MessageSets are not preceded by an int32 like other array elements in the protocol.
Code Block |
---|
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
|
Message format
Code Block |
---|
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
|
Field | Description |
---|---|
Offset | This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes. |
Crc | The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. |
MagicByte | This is a version id used to allow backwards compatible evolution of the message binary format. |
Attributes | This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0. |
Key | The key is an optional message key that was used for partition assignment. The key can be null. |
Value | The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null. |
Compression
Kafka supports compressing messages for additional efficiency, however this is more complex than just compressing a raw message. Because individual messages may not have sufficient redundancy to enable good compression ratios, compressed messages must be sent in special batches (although you may use a batch of one if you truly wish to compress a message on its own). The messages to be sent are wrapped (uncompressed) in a MessageSet structure, which is then compressed and stored in the Value field of a single "Message" with the appropriate compression codec set. The receiving system parses the actual MessageSet from the decompressed value.
...
Metadata Request
Code Block |
---|
MetadataRequest => [TopicName]
TopicName => string
|
...
The response contains metadata for each partition, with partitions grouped together by topic. This metadata refers to brokers by their broker id. The brokers each have a host and port.
Code Block |
---|
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
|
...
Produce Request
Code Block |
---|
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
|
...
Produce Response
Code Block |
---|
ProduceResponse => [TopicName [Partition ErrorCode Offset]]
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
|
...
One thing to note is that the fetch API requires specifying the partition to consume from. The question is how should a consumer know what partitions to consume from? In particular how can you balance the partitions over a set of consumers acting as a group so that each consumer gets a subset of partitions. We have done this assignment dynamically using zookeeper for the scala and java client. The downside of this approach is that it requires a fairly fat client and a zookeeper connection. We haven't yet created a Kafka API to allow this functionality to be moved to the server side and accessed more conveniently. A simple consumer client can be implemented by simply requiring that the partitions be specified in config, though this will not allow dynamic reassignment of partitions should that consumer fail. We hope to address this gap in the next major release.
Fetch Request
Code Block |
---|
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
TopicName => string
Partition => int32
FetchOffset => int64
MaxBytes => int32
|
Field | Description |
---|---|
ReplicaId | The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. |
MaxWaitTime | The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. |
MinBytes | This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). |
TopicName | The name of the topic. |
Partition | The id of the partition the fetch is for. |
FetchOffset | The offset to begin this fetch from. |
MaxBytes | The maximum bytes to include in the message set for this partition. This helps bound the size of the response. |
Fetch Response
Code Block |
---|
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
TopicName => string
Partition => int32
ErrorCode => int16
HighwaterMarkOffset => int64
MessageSetSize => int32
|
...
We agree that this API is slightly funky.
Offset Request
Code Block |
---|
OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
MaxNumberOfOffsets => int32
|
...
Offset Response
Code Block |
---|
OffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode [Offset]
Partition => int32
ErrorCode => int16
Offset => int64
|
...
Offset Commit Request
Code Block |
---|
OffsetCommitRequest => ConsumerGroup [TopicName [Partition Offset Metadata]]
ConsumerGroup => string
TopicName => string
Partition => int32
Offset => int64
Metadata => string
|
Offset Commit Response
Code Block |
---|
OffsetCommitResponse => ClientId [TopicName [Partition ErrorCode]]]
ClientId => string
TopicName => string
Partition => int32
ErrorCode => int16
|
Offset Fetch Request
Code Block |
---|
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
ConsumerGroup => string
TopicName => string
Partition => int32
|
Offset Fetch Response
Code Block |
---|
OffsetFetchResponse => ClientId [TopicName [Partition Offset Metadata ErrorCode]]
ClientId => string
TopicName => string
Partition => int32
Offset => int64
Metadata => string
ErrorCode => int16
|
...
API name | ApiKey Value |
---|---|
ProduceRequest | 0 |
FetchRequest | 1 |
OffsetRequest | 2 |
MetadataRequest | 3 |
LeaderAndIsrRequest | 4 |
StopReplicaRequest | 5 |
OffsetCommitRequest | 6 8 |
OffsetFetchRequest | 7 9 |
Error Codes
We use numeric codes to indicate what problem occurred on the server. These can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Here is a table of the error codes currently in use:
...