...
Code Block | ||||
---|---|---|---|---|
| ||||
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | REQUEST_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | REQUEST_TYPE | TOPIC_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / / / TOPIC (variable length) / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | PARTITION | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ REQUEST_LENGTH = int32 // Length in bytes of entire request (excluding this field) REQUEST_TYPE = int16 // See table below TOPIC_LENGTH = int16 // Length in bytes of the topic name TOPIC = String // Topic name, ASCII, not null terminated // This becomes the name of a directory on the broker, so no // chars that would be illegal on the filesystem. PARTITION = int32 // Partition to act on. Number of available partitions is // controlled by broker config. Partition numbering // starts at 0. ============ ===== ======================================================= REQUEST_TYPE VALUE DEFINITION ============ ===== ======================================================= PRODUCE 0 Send a group of messages to a topic and partition. FETCH 1 Fetch a group of messages from a topic and partition. MULTIFETCH 2 Multiple FETCH requests, chained together MULTIPRODUCE 3 Multiple PRODUCE requests, chained together OFFSETS 4 Find offsets before a certain time (this can be a bit misleading, please read the details of this request). ============ ===== ======================================================= |
Code Block | ||||
---|---|---|---|---|
| ||||
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | RESPONSE_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | ERROR_CODE | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ RESPONSE_LENGTH = int32 // Length in bytes of entire response (excluding this field) ERROR_CODE = int16 // See table below. ================ ===== =================================================== ERROR_CODE VALUE DEFINITION ================ ===== =================================================== Unknown -1 Unknown Error NoError 0 Success OffsetOutOfRange 1 Offset requested is no longer available on the server InvalidMessage 2 A message you sent failed its checksum and is corrupt. WrongPartition 3 You tried to access a partition that doesn't exist (was not between 0 and (num_partitions - 1)). InvalidFetchSize 4 The size you requested for fetching is smaller than the message you're trying to fetch. ================ ===== =================================================== |
FIXME: Add tests to verify all these codes.
FIXME: Check that there weren't more codes added in 0.7Very similar to the Request-Header is the multi-request header used for requesting more than one topic-partition combo at a time. Either for multi-produce, or multi-fetch.
Code Block | ||||
---|---|---|---|---|
| ||||
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | REQUEST_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | MAGIC REQUEST_TYPE | | TOPICPARTITION_COUNT CHECKSUM | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |REQUEST_LENGTH CHECKSUM (cont.)| = int32 // Length in bytes of entire request (excluding this field) REQUEST_TYPE PAYLOAD = int16 // See table above TOPICPARTITION_COUNT = int16 // number of / unique topic-partition combos in this request |
FIXME: Add tests to verify all these codes.
FIXME: Check that there weren't more codes added in 0.7.
Code Block | ||||
---|---|---|---|---|
| ||||
0 +-+-+-+-+-+-+-+-+-+ 1 / / 2 3 0 1 2 3 PAYLOAD (cont.) /4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ LENGTH| = int32 // Length in bytes of entire message (excluding this field) MAGIC = int8 // 0 is the only valid value CHECKSUM = int32 // CRC32 checksum of the PAYLOAD PAYLOAD = Bytes[] // Message content |
The offsets to request messages are just byte offsets. To find the offset of the next message, take the offset of this message (that you made in the request), and add LENGTH + 4 bytes (length of this message + 4 byte header to represent the length of this message).
Starting with version 0.7, Kafka added an extra field for compression:
Code Block | ||||
---|---|---|---|---|
| ||||
0 LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | MAGIC | 1 CHECKSUM 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | CHECKSUM (cont.)| PAYLOAD / +-+-+-+-+-+-+-+-+-+ | LENGTH / / | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | MAGIC PAYLOAD (cont.) | COMPRESSION | CHECKSUM |/ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |LENGTH = int32 // Length in bytes of entire message (excluding this field) MAGIC = int8 // 0 is the only valid value CHECKSUM = int32 // CRC32 checksum of the PAYLOAD PAYLOAD = Bytes[] // Message content |
The offsets to request messages are just byte offsets. To find the offset of the next message, take the offset of this message (that you made in the request), and add LENGTH + 4 bytes (length of this message + 4 byte header to represent the length of this message).
Starting with version 0.7, Kafka added an extra field for compression:
Code Block | ||||
---|---|---|---|---|
| ||||
0CHECKSUM (cont.) | PAYLOAD / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / / PAYLOAD (cont.) 1 / +-+-+-+-+-+-+-+-+-+ 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -+-+-+-+-+-+-+-+-+ LENGTH| = int32 // Length in bytes of entire message (excluding this field) MAGIC = int8 // 0 = COMPRESSION attribute byte does not exist (v0.6 and below) LENGTH // 1 = COMPRESSION attribute byte exists (v0.7 and above) COMPRESSION = int8| // 0 = none; 1 = gzip; 2 = snappy; +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | MAGIC | COMPRESSION | CHECKSUM // Only exists at all if MAGIC| == 1 CHECKSUM = int32 // CRC32 checksum of the PAYLOAD PAYLOAD = Bytes[] // Message content |
Note that compression is end-to-end. Meaning that the Producer is responsible for sending the compressed payload, it's stored compressed on the broker, and the Consumer is responsible for decompressing it. Gzip gives better compression ratio, snappy gives faster performance.
Let's look at what compressed messages act like:
Code Block | ||
---|---|---|
| ||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | CHECKSUM (cont.) | PAYLOAD / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | CP1 / / | CP2 | PAYLOAD (cont.) CP3 | | M1 | M2 | M3 | M4... | M12 | M13 | M14... | M26 | M27 | M28 ... | / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
In this scenario, let's say that M1
, M2
, etc. represent complete, uncompressed messages (including headers) that the user of your library wants to send. What your driver needs to do is take M1
, M2
... up to some predetermined number/size, concatenate them together, and then compress them using gzip or snappy. The result (CP1
in this case) becomes the PAYLOAD
for the compressed message CM1
that your library will send to Kafka.
It also means that we have to be careful about calculating the offsets. To Kafka, M1
, M2
, don't really exist. It only sees the CM1
you send. So when you make calculations for the offset you can fetch next, you have to make sure you're doing it on the boundaries of the compressed messages, not the inner messages.
FIXME: Haven't implemented compression yet, need to verify this is correct.
Interactions
Produce
To produce messages from the driver and send to Kafka, use the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
0 1 LENGTH = int32 // Length in bytes of entire message (excluding this field) MAGIC = int8 // 0 = COMPRESSION attribute byte does not exist (v0.6 and below) // 1 = COMPRESSION attribute byte exists (v0.7 and above) COMPRESSION = int8 // 0 = none; 1 = gzip; 2 = snappy; 2 // Only exists at all if MAGIC 3== 1 CHECKSUM 0= 1int32 2 3// 4CRC32 5checksum 6of 7the 8PAYLOAD 9 0PAYLOAD 1= 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 Bytes[] // Message content |
Note that compression is end-to-end. Meaning that the Producer is responsible for sending the compressed payload, it's stored compressed on the broker, and the Consumer is responsible for decompressing it. Gzip gives better compression ratio, snappy gives faster performance.
Let's look at what compressed messages act like:
Code Block | ||
---|---|---|
| ||
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /| CP1 | REQUEST HEADER CP2 | CP3 / / | | M1 | M2 | M3 | M4... | M12 | M13 | M14... | M26 | M27 | M28 /... | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
In this scenario, let's say that M1
, M2
, etc. represent complete, uncompressed messages (including headers) that the user of your library wants to send. What your driver needs to do is take M1
, M2
... up to some predetermined number/size, concatenate them together, and then compress them using gzip or snappy. The result (CP1
in this case) becomes the PAYLOAD
for the compressed message CM1
that your library will send to Kafka.
It also means that we have to be careful about calculating the offsets. To Kafka, M1
, M2
, don't really exist. It only sees the CM1
you send. So when you make calculations for the offset you can fetch next, you have to make sure you're doing it on the boundaries of the compressed messages, not the inner messages.
FIXME: Haven't implemented compression yet, need to verify this is correct.
Interactions
Produce
To produce messages from the driver and send to Kafka, use the following format:
Code Block | ||||
---|---|---|---|---|
| ||||
0 | MESSAGES_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / / / 1 MESSAGES 2 / +-+-+-+-+ 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ MESSAGES_LENGTH = int32 // Length in bytes of the MESSAGES section MESSAGES = Collection of MESSAGES (see above) |
There is no response to a PRODUCE
Request. There is currently no way to tell if the produce was successful or not. This is being worked.
Multi-Produce
The multi-produce request has a different header, with the (topic-length/topic/message_length/messages) repeated many times.
Code Block | ||||
---|---|---|---|---|
| ||||
Here is the general format of the multi-produce request +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ /-+-+-+-+ / REQUEST HEADER / / MULTI-PRODUCE HEADER / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TOPIC-PARTION/MESSAGES (n times MESSAGES_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Multi-Produce Header / 0 1 2 3 0 1/ 2 3/ 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |MESSAGES REQUEST_LENGTH |/ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |MESSAGES_LENGTH = int32 // Length in bytes of the REQUEST_TYPEMESSAGES section MESSAGES = Collection of MESSAGES (see above) |
There is no response to a PRODUCE
Request. There is currently no way to tell if the produce was successful or not. This is being worked.
Multi-Produce
The multi-produce request has a different header, with the (topic-length/topic/message_length/messages) repeated many times.
Code Block | ||||
---|---|---|---|---|
| ||||
Here is the general format of the multi-produce request, see multi-request header above. | TOPICPARTITION_COUNT | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / REQUEST_LENGTH = int32 // Length in bytes of entire request (excluding this field) MULTI-REQUEST HEADER REQUEST_TYPE = int16 // See table below TOPICPARTITION_COUNT = int16 // number of unique topic-partition combos in this request / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TOPIC-PARTION/MESSAGES (n times) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Per Topic-Partition (repeated n times) 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TOPIC_LENGTH | TOPIC (variable length) / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | PARTITION | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | MESSAGES_LENGTH | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / MESSAGES / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ The TOPIC_LENGTH, TOPIC, PARTITION, MESSAGES_LENGTH are documented above for size. |
...
- If you ask the broker for up to 300K worth of messages from a given topic and partition, it will send you the appropriate headers followed by a 300K chunk worth of the message log. If 300K ends in the middle of a message, you get half a message at the end. If it ends halfway through a message header, you get a broken header. This is not an error, this is Kafka pushing complexity outward to the driver to make the broker simple and fast.
- Kafka stores its messages in log files of a configurable size (512MB by default) called segments. A fetch of messages will not cross the segment boundary to read from multiple files. So if you ask for a fetch of 300K's worth of messages and the offset you give is such that there's only one message at the end of that segment file, then you will get just one message back. The next time you call fetch with the following offset, you'll get a full set of messages from the next segment file. Basically, don't make any assumptions about how many messages are remaining from how many you got in the last fetch.
Multi-Fetch
FIXME: Haven't implemented this to verify yet.
- the following offset, you'll get a full set of messages from the next segment file. Basically, don't make any assumptions about how many messages are remaining from how many you got in the last fetch.
Multi-Fetch
Code Block | ||||
---|---|---|---|---|
| ||||
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ MULTI-REQUEST HEADER /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TOPIC-PARTION-FETCH-REQUEST (n times ) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
REQUEST_HEADER = See MULTI_REQUEST_HEADER above
OFFSET = int64 // Offset in topic and partition to start from
MAX_SIZE = int32 // MAX_SIZE of the message set to return
The TOPIC_LENGTH, TOPIC, PARTITION, MESSAGES_LENGTH are documented above for size.
Per Topic-Partition-Fetch- Request (repeated n times)
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TOPIC_LENGTH | TOPIC (variable length) /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PARTITION |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| OFFSET |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ MAX_SIZE /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|
Offsets
Code Block | ||||
---|---|---|---|---|
| ||||
0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ / REQUEST HEADER / / / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TIME | | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | MAX_NUMBER (of OFFSETS) | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ TIME = int64 // Milliseconds since UNIX Epoch. // -1 = LATEST // -2 = EARLIEST MAX_NUMBER = int32 // Return up to this many offsets |
...
Role | ZooKeeper Path | Type | Data Description | ||
---|---|---|---|---|---|
<ac:structured-macro ac:name="unmigrated-wiki-markup" ac:schema-version="1" ac:macro-id="343b3c1f726a7c12-86bd77c7-469946e2-9dff99f8-01a9baf06f533acf419a232f"><ac:plain-text-body><![CDATA[ | ID Registry |
| Ephemeral | String in the format of "creator:host:port" of the broker. | ]]></ac:plain-text-body></ac:structured-macro> |
<ac:structured-macro ac:name="unmigrated-wiki-markup" ac:schema-version="1" ac:macro-id="f0575681582ee865-8d2d2773-43ac4d43-b927a61a-369e765693335457ce9aaf72"><ac:plain-text-body><![CDATA[ | Topic Registry |
| Ephemeral | Number of partitions that topic has on that Broker. | ]]></ac:plain-text-body></ac:structured-macro> |
...