You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Introduction

This document covers the protocol implemented in Kafka 0.8. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them in a client. It is aimed at making it easier to implement a client, or just as documentation for the curious. This document assumes you understand the basic design and terminology described here.

Preliminaries

Kafka uses a binary protocol over TCP. This protocol defines all apis as request response message pairs. All messages are size delimited and are made up of the following primitive types:

int8, int16, int32, int64 - Integers with the given precision stored in big endian order.

string - A variable-length, size-delimited string. The first two bytes contain a length N stored as an int16. The succeeding N bytes contain the UTF8 encoding of the string.

Client Requests

There are only four client requests APIs:

  1. Metadata - Describes the currently available brokers, their host and port information, and gives information about which broker hosts which partitions.
  2. Send - Send messages to a broker
  3. Fetch - Fetch messages from a broker, one which fetches data, one which gets cluster metadata, and one which gets offset information about a topic.
  4. Offsets - Get offset information about a topic.

Each of these will be described in great detail below.

Metadata Request

This API answers the question "who has what data and where are they?". Specifically this request will tell for each topic how many partitions it has, which brokers currently host each of these partitions, and which of these is the master. Since Kafka is a partitioned system requests need to be directed to the appropriate server--the one currently acting as the master for the partition you want to interact with. Since cluster membership in Kafka is dynamic, you can't just give all the clients a config file with all the brokers (some of them may be down, or partitions may have moved); instead you need to ask the cluster about its current state at run time. Hence the first thing a client needs to do when it connects is ask, "where is everyone?" using this metadata API.

This is the only request that can be made to any server without regard to partition ownership and all servers will give the same answer (disregarding timing differences). Fetch and produce requests always interact with particular partitions, and sending these to the wrong broker will result in an invalid metadata error. The client is expected to cache the cluster metadata locally, using it to direct requests to the correct hosts, until it gets an invalid metadata error or can't reach a particular broker, at which point it should fetch the metadata again and update its cache.

This presents a bit of a catch-22, since the only way to find out which Kafka servers exists is to ask a Kafka server, so how can a client ever connect the first time? To do this a client should take a "bootstrap urls" configuration from which it can find out the list of currently available servers. Importantly this need not contain all the servers in the cluster, maybe just two or three for redundancy. The client should try each of these until it finds one it can connect to. This will ensure that even if one of the bootstrap servers is down the client can still fetch the cluster metadata.

For deployment you may not want to hardcode such a list and may prefer to rely on dns or a VIP or something like that to find a bootstap server.

So the lifecycle of most clients looks something like this:

  1. Cycle through a list of bootstrap kafka urls until we find one we can connect to. Fetch cluster metadata.
  2. Process fetch or produce requests, directing them to the appropriate broker based on the topic/partitions they send to or fetch from.
  3. If we get an appropriate error, refresh the metadata and try again.

Metadata Format
Here is the full BNF for metadata requests:

metadata_request => version_id 
                    client_id 
                    number_of_topics
                    topics


version_id => int16
client_id => string
number_of_topics => int32
topics => string*


metadata_response => version_id
                     number_of_topics
                     topic_metadata*
topic_metadata => topic
                  partition_metadata*
                  error_code

Some Common Philosophical Questions

Some people have asked why we don't use HTTP. There are a number of reasons, the best is that client implementors can make use of some of the more advanced TCP features--the ability to multiplex requests, the ability to simultaneously poll many connections, etc. We have also found HTTP libraries in many languages to be surprisingly shabby.

Others have asked if maybe we shouldn't support many different protocols. Prior experience with this was that it makes it very hard to add and test new features if they have to be ported across many protocol implementations. Our feeling is that most users don't really see multiple protocols as a feature, they just want a good reliable client in the language of their choice.

Another question is why we don't adopt XMPP, STOMP, AMQP or an existing protocol. The answer to this varies by protocol, but in general the problem is that the protocol does determine large parts of the implementation and we couldn't do what we are doing if we didn't have control over the protocol. Our belief is that it is possible to do better than existing messaging systems have in providing a truly distributed messaging system, and to do this we need to build something that works different.

A final question is why we don't use a system like Protocol Buffers or Thrift to define our messages. These packages excel at helping you to managing lots and lots of serialized messages. However we have only a few messages. Support across languages is somewhat spotty (depending on the package). Finally the mapping between binary log format and wire protocol is something we manage somewhat carefully and this would not be possible with these systems. Finally we prefer the style of versioning APIs explicitly and checking this to inferring new values as nulls as it allows more nuanced control of compatibility.

  • No labels