Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Variable Length Primitives

_bytes16, bytes32, string - These types consist of a signed integer giving a length N followed by N bytes of content. -1 indicates null. bytes16 and string use a two byte int16 size and bytes32 uses a four byte int32 size. string16 is identical in format to bytes16 but the bytes should be interpreted as UTF8 encoded characters.

...

The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be inclosed enclosed in parenthesis for grouping. The top-level definition is always given first and subsequent sub-parts are indented.

...

Field

Description

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any any value here it likes.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format.

Attributes

This byte holds metadata attributes about the message. In particular the last 3 bits contain the compression codec used for the message.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set.

...

Field

Description

RequiredAcks

This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server responds immediately prior to even writing the data to disk. If it is 1 the data is written to the local machine only with no blocking on replicas. If it is -1 the server will block until the message is committed by all in sync replicas. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

Timeout

This provides a maximum time the server can await the receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout.

TopicName

The topic that data is being published to.

Partition

The partition that data is being published to.

MessageSetSize

The size, in bytes, of the message set that follows.

MessageSet

A set of messages in the standard format described above.

...

This API describes the valid offset rage range available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.

...

Another question is why we don't adopt XMPP, STOMP, AMQP or an existing protocol. The answer to this varies by protocol, but in general the problem is that the protocol does determine large parts of the implementation and we couldn't do what we are doing if we didn't have control over the protocol. Our belief is that it is possible to do better than existing messaging systems have in providing a truly distributed messaging system, and to do this we need to build something that works differentdifferently.

A final question is why we don't use a system like Protocol Buffers or Thrift to define our request messages. These packages excel at helping you to managing lots and lots of serialized messages. However we have only a few messages. Support across languages is somewhat spotty (depending on the package). Finally the mapping between binary log format and wire protocol is something we manage somewhat carefully and this would not be possible with these systems. Finally we prefer the style of versioning APIs explicitly and checking this to inferring new values as nulls as it allows more nuanced control of compatibilitof compatibility.