Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Table of Contents

Introduction

This document covers the protocol implemented in Kafka 0.8. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them in a client. It is aimed at making it easier to implement a client. This document assumes you understand the basic design and terminology described here.

...

Code Block
ProduceRequest =>

Interaction With The Server

Some Common Philosophical Questions

 RequiredAcks Timeout TopicData

ProduceResponse => [TopicProduceResponse]


TopicResponse => TopicName [PartitionProduceResponse]
PartitionProduceResponse => Partition ErrorCode Offset


RequiredAcks => int16
Timeout => int32


TopicData => TopicName [PartitionData]
TopicName => string


PartitionData => Partition MessageSetSize MessageSet
Partition => int32
MessageSetSize => int32


ErrorCode => int16
Offset => int64

Field

Description

RequiredAcks

This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server responds immediately prior to even writing the data to disk. If it is 1 the data is written to the local machine only with no blocking on replicas. If it is -1 the server will block until the message is committed by all in sync replicas. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

Timeout

This provides a maximum time the server can await receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout.

Topic

The topic that data is being published to.

Partition

The partition that data is being published to.

MessageSetSize

The number of bytes in the message set that follows.

MessageSet

This is the message set format described in a previous section.

ErrorCode

The error from this partition, if any

Offset

The offset assigned to the first message in the message set for this partition.

Fetch API

The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages.

Fetch requests follow a long poll model so they can be made to block for a period of time if sufficient data is not immediately available.

Code Block

FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicFetchRequest]


ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
TopicFetchRequest => TopicName [PartitionFetchRequest]
TopicName => string
PartitionFetchResponse => Partition FetchOffset MaxBytes
FetchOffset => int64


FetchResponse => [TopicData]
TopicData => Partition [PartitionData]
PartitionData => ErrorCode FetchedOffset HighwaterMarkOffset MessageSetSize MessageSet
ErrorCode => int16
FetchedOffset => uint64
HighwaterMarkOffset => int64
MessageSetSize => int32

Field

Description

ReplicaId

The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes.

MaxWaitTime

The max wait time is the maximum amount of time to block waiting if insufficient data is available at the time the request is issued.

MinBytes

This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

TopicName

The name of the topic.

Partition

The id of the partition the fetch is for.

FetchOffset

The offset to begin fetching from.

MaxBytes

The maximum bytes to include in the response.

Offset API

This API describes the valid offsets available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.

The response contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.

We agree that this API is slightly funky.

Code Block

OffsetRequest => [TopicName [PartitionOffsetRequest]]
  TopicName => string
  PartitionOffsetRequest => Partition Time MaxNumberOfOffsets
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32


OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64

Field

Description

 

 

 

 

 

 

Error Codes

We use numeric codes to indicate what problem occurred on the server. These can be translated by the client into exceptions or whatever the appropriate error handling mechanism in the client language. Here is a table of the error codes currently in use:

Error

Code

Description

NoError

0

No error--it worked!

Unknown

-1

An unexpected server error

OffsetOutOfRangeCode

1

The requested offset is outside the range of offsets maintained by the server for the given topic/partition.

InvalidMessageCode

2

This indicates that a message contents does not match its CRC

UnknownTopicOrPartitionCode

3

This request is for a topic or partition that does not exist on this broker.

InvalidMessageSizeCode

4

The message has a negative size

LeaderNotAvailableCode

5

This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes.

NotLeaderForPartitionCode

6

This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.

RequestTimedOutCode

7

This error is thrown if the request exceeds the user-specified time limit in the request.

BrokerNotAvailableCode

8

This is not a client facing error and is used only internally by intra-cluster broker communication.

ReplicaNotAvailableCode

9

What is the difference between this and LeaderNotAvailable

MessageSizeTooLargeCode

10

The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.

Interaction With The Server

Interaction with the server uses request response pairs. The client initiates a socket connection and then writes a sequence of request messages and reads back the corresponding response message. No handshake is required on connection or disconnection.

The client will likely need to maintain a connection to multiple brokers, however it should not be necessary to maintain multiple connections to a single broker from a single client instance (i.e. connection pooling).

The producer is responsible for determining the partitioning of data sent, the brokers will check that they are leader for the partition of data sent to them however they will not attempt to determine if this the the "right" partition for data to be sent to as that is purely a client concern. Most clients will want to partition by the message key, but the exact nature of this partitioning is purely a client concern. In the scala client we have made it pluggable via a partitioner interface that controls the mapping of key to partition.

As indicated above the client must address its requests to the correct broker. The metadata request can be issued against any server and using the resulting cluster metadata the client should direct requests to the leader for the partition it wants to produce to or fetch from. If the client gets an error it should refresh its metadata.

The server has a configurable maximum limit on request size and any request that exceeds this limit will result in the socket being disconnected.

Some Common Philosophical Questions

Some people have asked why we don't use HTTP. There are Some people have asked why we don't use HTTP. There are a number of reasons, the best is that client implementors can make use of some of the more advanced TCP features--the ability to multiplex requests, the ability to simultaneously poll many connections, etc. We have also found HTTP libraries in many languages to be surprisingly shabby.

...

A final question is why we don't use a system like Protocol Buffers or Thrift to define our messages. These packages excel at helping you to managing lots and lots of serialized messages. However we have only a few messages. Support across languages is somewhat spotty (depending on the package). Finally the mapping between binary log format and wire protocol is something we manage somewhat carefully and this would not be possible with these systems. Finally we prefer the style of versioning APIs explicitly and checking this to inferring new values as nulls as it allows more nuanced control of compatibility.of compatibilit