Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Started ZooKeeper section

...

This is an attempt to document Kafka's wire format and implementation details to encourage others to write drivers in various languages. Much of this document is based off of the Kafka Wire Format article by Jeffrey Damick and Taylor Gautier.

This does not yet cover ZooKeeper integration, but I hope to add that shortly.

Status of this Document

I'm currently in the process of verifying many of the things said here, to make sure they're actually a result of the protocol and not some quirk of our driver. I've tried to flag those with "FIXME" notes, but I'm sure I've missed a few.

...

If you haven't read the design doc, read it. There are a few things that haven't been updated for v0.7 yetare outdated, but it's a great overview of Kafka.

...

Code Block
titleRequest Header (all requests begin with this)
borderStylesolid
    
   0                   1                   2                   3
   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                       REQUEST_LENGTH                          |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |         REQUEST_TYPE          |        TOPIC_LENGTH           |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  /                                                               /
  /                    TOPIC (variable length)                    /
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                           PARTITION                           |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

  REQUEST_LENGTH = int32 // Length in bytes of entire request (excluding this field)
  REQUEST_TYPE   = int16 // See table below
  TOPIC_LENGTH   = int16 // Length in bytes of the topic name

  TOPIC = String // Topic name, ASCII, not null terminated
                 // This becomes the name of a directory on the broker, so no 
                 // chars that would be illegal on the filesystem.

  PARTITION = int32 // Partition to act on. Number of available partitions is 
                    // controlled by broker config. Partition numbering 
                    // starts at 0.

  ============  =====  =======================================================
  REQUEST_TYPE  VALUE  DEFINITION
  ============  =====  =======================================================
  PRODUCE         0    Send a group of messages to a topic and partition.
  FETCH           1    Fetch a group of messages from a topic and partition.
  MULTIFETCH      2    Multiple FETCH requests, chained together
  MULTIPRODUCE    3    Multiple PRODUCE requests, chained together
  OFFSETS         4    Find offsets before a certain time (this can be a bit
                       misleading, please read the details of this request).
  ============  =====  =======================================================
Code Block
titleResponse Header (all responses begin with this 6 byte header)
borderStylesolid
   0                   1                   2                   3
   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                        RESPONSE_LENGTH                        |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |         ERROR_CODE            |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

  RESPONSE_LENGTH = int32 // Length in bytes of entire response (excluding this field)
  ERROR_CODE = int16 // See table below.

  ================  =====  ===================================================
  ERROR_CODE        VALUE  DEFINITION
  ================  =====  ===================================================
  Unknown            -1    Unknown Error
  NoError             0    Success 
  OffsetOutOfRange    1    Offset requested is no longer available on the server
  InvalidMessage      2    A message you sent failed its checksum and is corrupt.
  WrongPartition      3    You tried to access a partition that doesn't exist
                           (was not between 0 and (num_partitions - 1)).
  InvalidFetchSize    4    The size you requested for fetching is smaller than
                           the message you're trying to fetch.
  ================  =====  ===================================================

...

Code Block
titleOffsets Request
borderStylesolid
   0                   1                   2                   3
   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  /                         REQUEST HEADER                        /
  /                                                               /
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                              TIME                             |
  |                                                               |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
  |                     MAX_NUMBER (of OFFSETS)                   |
  +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

  TIME = int64 // Milliseconds since UNIX Epoch.
               // -1 = LATEST 
               // -2 = EARLIEST
  MAX_NUMBER = int32 // Return up to this many offsets

...

Each file is named after the offset represented by the first message in that file. The size of the segments are configurable (512MB by default). Kafka will write to the current segment file until it goes over that size, and then will write the next message in new segment file. The files are actually slightly larger than the limit, because Kafka will finish writing the message – a single message is never split across multiple files.

ZooKeeper

Kafka relies on ZooKeeper in order to coordinate multiple brokers and consumers. If you're unfamiliar with ZooKeeper, just think of it as a server that allows you to atomically create nodes in a tree, assign values to those nodes, and sign up for notifications when a node or its children get modified. Nodes can be either be permanent or ephemeral, the latter meaning that the nodes will disappear if the process that created them disconnects (after some timeout delay).

Wiki Markup
While creating the nodes we care about, you'll often need to create the intermediate nodes that they are children of. For instance, since offsets are stored at {{/consumers/\[consumer_group\]/offsets/\[topic\]/\[broker_id\]-\[partition_id\]}}, something has to create {{/consumers}}, {{/consumers/\[consumer_group\]}}, etc. All nodes have values associated with them in ZooKeeper, even if Kafka doesn't use them for anything. To make debugging easier, the value that should be stored at an intermediate node is the ID of the node's creator. In practice that means that the first Consumer you create will need to make this skeleton structure and store its ID as the value for {{/consumers}}, {{/consumers/\[consumer_group\]}}, etc.

ZooKeeper has Java and C libraries, and can be run as a cluster.

Basic Responsibilities

Kafka Brokers, Consumers, and Producers all have to coordinate using ZooKeeper. The following is a high level overview of their ZooKeeper interactions. We assume here that a Consumer only consumes one topic

Kafka Broker

  • When starting up:
    • Publish its brokerid – a simple integer ID that uniquely identifies it.
    • Publish its location, so that Producers and Consumers can find it.
    • Publish all topics presently in the Broker, along with the number of partitions for each topic.
  • Whenever a new topic is created:
    • Publish the topic, along with the number of partitions in the topic

Producer

  • Read the locations for all Brokers with a given topic, so that we know where to send messages to.

Consumer

  • Publish what ConsumerGroup we belong to.
  • Publish our unique ID so other Consumers can see us.
  • Determine which partitions on which Brokers this Consumer is responsible for, and publish its ownership of them.
  • Publish what offset we've successfully read up to for every partition that we're responsible for.

Every Consumer belongs to exactly one ConsumerGroup. A Consumer can read from multiple brokers and partitions, but every unique combination of (ConsumerGroup, Broker, Topic, Partition) is read by only one and only one Consumer. ConsumerGroups allow you to easily support topic or queue semantics. If your Consumers are all in separate ConsumerGroups, each message goes to every Consumer. If all your Consumers are in the same ConsumerGroup, then each message goes to only one Consumer.

Well, for the most part. The orchestration Kafka uses guarantees at least once delivery, but has edge cases that may yield duplicate delivery even in a queue (one ConsumerGroup) arrangement.

Kafka Broker

All these nodes are written by the Kafka Broker. Your client just needs to be able to read this broker data and understand its limitations.

Role

ZooKeeper Path

Type

Data Description

<ac:structured-macro ac:name="unmigrated-wiki-markup" ac:schema-version="1" ac:macro-id="57d903fd-3a4f-4359-8563-9752027cb981"><ac:plain-text-body><![CDATA[

ID Registry

/brokers/ids/[0..N]

Ephemeral

String in the format of "creator:host:port" of the broker.

]]></ac:plain-text-body></ac:structured-macro>

<ac:structured-macro ac:name="unmigrated-wiki-markup" ac:schema-version="1" ac:macro-id="b893a0ab-c563-4a17-ae26-969158dbbe10"><ac:plain-text-body><![CDATA[

Topic Registry

/brokers/topics/[topic]/[0..N]

Ephemeral

Number of partitions that topic has on that Broker.

]]></ac:plain-text-body></ac:structured-macro>

So let's take the example of the following hypothetical broker:

  • Broker ID is 2 (brokerid=2 in the Kafka config file)
  • Running on IP 10.0.0.12
  • Using port 9092
  • Topics:
    • "dogs" with 4 partitions
    • "mutts" with 5 partitions

Then the broker would register the following:

  • /brokers/ids/2 = 10.0.0.12-1324306324402:10.0.0.12:9092
  • /brokers/topics/dogs/2 = 4
  • /brokers/topics/mutts/2 = 5

Some things to note:

  • Broker IDs don't have to be sequential, but they do have to be integers. They are a config setting, and not randomly generated. If a Kafka server goes offline for some reason and comes back an hour later, it should be reconnecting with the same Broker ID.
  • The ZooKeeper hierarchy puts individual brokers under topics because Producers and Consumers will want to put a watch on a specific topic node, to get notifications when new brokers enter or leave the pool.
  • The Broker's description is formatted such that it's creator:host:port. The host will also up as part of the creator because of the version of UUID that Kafka's using, but don't rely on that behavior. Always split on ":" and extract the host that will be the second element.
  • These nodes are ephemeral, so if the Broker crashes or is disconnected from the network, it will automatically be removed. But this removal is not instantaneous, and it might show up for a few seconds. This can cause errors when a broker crashes and is restarted, and subsequently tries to re-create its still existent Broker ID registry node.

Producer

Reads:

  • Wiki Markup
    {{/brokers/topics/\[topic\]/\[0..N\]}}, so that it knows what Broker IDs are available for this topic, and how many partitions they have.
  • Wiki Markup
    {{/brokers/ids/\[0..N\]}}, to find the address of the Brokers, so it knows how to connect to them.

Watches:

  • Wiki Markup
    {{/brokers/topics/\[topic\]}}, so that it knows when Brokers enter and exit the pool.
  • /brokers/ids, so that it can update the Broker addresses in case you bring down a Broker and bring it back up under a different IP/port.

Producers are fairly straightforward (with one caveat), and a Producer never has to write anything to ZooKeeper. The basic operation goes like this:

  1. A Producer is created for a topic.
  2. Wiki Markup
    The Producer reads the Broker-created nodes in {{/brokers/ids/\[0..N\]}} and sets up an internal mapping of Broker IDs => Kafka connections.
  3. Wiki Markup
    The Producer reads the nodes in {{/brokers/topics/\[topic\]/\[0..N\]}} to find the number of partitions it can send to for each Broker.
  4. The Producer takes every Broker+Partition combination and puts them in an internal list.
  5. When a Producer is asked to send a message set, it picks from one of it's Broker+Partition combinations, looks up the appropriate Broker address, and sends the message set to that Broker, for that topic and partition. The precise mechanism for choosing a destination is undefined, but debugging would probably be easier if you ordered them by Broker+Partition (e.g. "0-3") and used a hash function to pick the index you wanted to send to. You could also just make it randomly choose.

The Producer's internal mappings change when:

  • When a Broker leaves the pool or a new Broker enters it.
  • The number of partitions a Broker publishes for a topic changes.

The latter is actually extremely common, which brings us to the only tricky part about Producers – dealing with new topics.

Creating New Topics

Wiki Markup
Topics are not pre-determined. You create them just by sending a new message to Kafka for that topic. So let's say you have a number of Brokers that have joined the pool and don't list themselves in {{/brokers/topics/\[topic\]/\[0..N\]}} for the topic you're interested in. They haven't done so because those topics don't exist on those Brokers yet. But our Producer knows the Brokers themselves exist, because they are in the Broker registry at {{/brokers/ids/\[0..N\]}}. We definitely need to send messages to them, but what partitions are safe to send to? Brokers can be configured differently from each other and topics can be configured on an individual basis, so there's no way to infer the definitive answer by looking at what's in ZooKeeper.

Wiki Markup
The solution is that for new topics where the number of available partitions on the Broker is unknown, you should just send to partition 0. Every Broker will at least have that one partition available. As soon as you write it and the topic comes into existence on the Broker, the Broker will publish all available partitions in ZooKeeper. You'll get notified by the watch you put on {{/brokers/topics/\[topic\]}}, and you'll add the new Broker+Partitions to your destination pool.

Consumer

This is where all the heavy lifting happens.

Rebalancing

Occurs: When Brokers or Consumers enter or leave the pool.

Objectives:

  • All Consumers in a ConsuerGroup will come to a consensus as to who is consuming what.
  • Each Broker+Topic+Partition combination is consumed by one and only one Consumer, even if it means that some Consumers don't get anything at all.
  • Wiki Markup
    A Consumer should try to have as many partitions on the same Broker as possible, so sort the list by \[Broker ID\]-\[Partition\] (0-0, 0-1, 0-2, etc.), and assign them in chunks.
  • Consumers are sorted by their Consumer IDs (which are just UUIDs). If there are three Consumers, two Brokers, and two partitions in each, the split might look like:
    • Wiki Markup
      Consumer 59a1bce1-2b1d-11e1-9aff-040ccee02800: \[0-0, 0-1\]
    • Wiki Markup
      Consumer 95eb94bd-2b1d-11e1-8e73-040ccee02800: \[0-2, 1-0\]
    • Wiki Markup
      Consumer bb451045-2b1d-11e1-b742-040ccee02800: \[1-1, 1-2\]
  • If the distribution can't be even and some Consumers must have more partitions than others, the extra partitions always go to the earlier consumers on the list. So you could have a distribution like 4-4-4-4 or 5-5-4-4, but never 4-4-4-5 or 4-5-4-4.

Code Example:

Code Block
titleA Consumer finding its Broker Partitions during rebalancing (Python)
borderStylesolid

    # all_broker_partitions and all_topic_consumers are both already sorted
    my_index = all_topic_consumers.index(self.id)
    bp_per_consumer = len(all_broker_partitions) / len(all_topic_consumers)
    consumers_with_extra = range(len(all_broker_partitions) % len(all_topic_consumers))

    # If the current consumer is among those that have an extra partition...
    num_parts = bp_per_consumer + (1 if my_index in consumers_with_extra else 0)

    # If the previous consumer was among the those that have an extra 
    # partition, add my_index so that we account for the extra partitions
    start = my_index * bp_per_consumer + \
            (my_index if my_index - 1 in consumers_with_extra else 0)

    self._broker_partitions = all_broker_partitions[start:start+num_parts]