...
Code Block |
---|
MessageSet => [Offset MessageSize Message] Offset => int64 MessageSize => int32 |
Message format
Code Block |
---|
v0
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
v1 (supported since 0.10.0)
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Timestamp => int64
Key => bytes
Value => bytes
|
Field | Description |
---|---|
Offset | This is the offset used in kafka as the log sequence number. When the producer is sending non compressed messages, it can set the offsets to anything. When the producer is sending compressed messages, to avoid server side recompression, each compressed message should have offset starting from 0 and increasing by one for each inner message in the compressed message. (see more details about compressed messages in Kafka below) |
Crc | The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer. |
MagicByte | This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 1. |
Attributes | This byte holds metadata attributes about the message. The lowest 3 bits contain the compression codec used for the message. The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0) All other bits should be set to 0. |
Timestamp | This is the timestamp of the message. The timestamp type is indicated in the attributes. |
Key | The key is an optional message key that was used for partition assignment. The key can be null. |
Value | The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null. |
...
Produce Response
Code Block |
---|
v0 ProduceResponse => [TopicName [Partition ErrorCode Offset]] TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 v1 (supported Timestamp => int64since 0.9.0) ProduceResponse => [TopicName [Partition ErrorCode Offset]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 ThrottleTime => int32 v2 (supported since 0.10.0) ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 Offset => int64 Timestamp => int64 ThrottleTime => int32 |
Field | Description |
---|---|
Topic | The topic this response entry corresponds to. |
Partition | The partition this response entry corresponds to. |
ErrorCode | The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be unavailable or maintained on a different host, while others may have successfully accepted the produce request. |
Offset | The offset assigned to the first message in the message set appended to this partition. |
Timestamp | If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set. All the messages in the message set have the same timestamp. If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the produce request has been accepted by the broker if there is no error code returnedif there is no error code returned. |
ThrottleTime | Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota). |
Possible Error Codes: (TODO)
...
Field | Description |
---|---|
ReplicaId | The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes. |
MaxWaitTime | The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued. |
MinBytes | This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). |
TopicName | The name of the topic. |
Partition | The id of the partition the fetch is for. |
FetchOffset | The offset to begin this fetch from. |
MaxBytes | The maximum bytes to include in the message set for this partition. This helps bound the size of the response. |
Fetch Response
for this partition. This helps bound the size of the response. |
Fetch Response
Code Block |
---|
v0
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
TopicName => string
Partition => int32
ErrorCode => int16
HighwaterMarkOffset => int64
MessageSetSize => int32
v1 and v2(since 0.10.0)
|
Code Block |
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]] ThrottleTime TopicName => string Partition => int32 ErrorCode => int16 HighwaterMarkOffset => int64 MessageSetSize => int32 ThrottleTime => int32 |
Field | Description |
---|---|
TopicName | The name of the topic this response entry is for. |
Partition | The id of the partition this response is for. |
HighwaterMarkOffset | The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are. |
MessageSetSize | The size in bytes of the message set for this partition | MessageSet | The message data fetched from this partition, in the format described above
are. | |
MessageSetSize | The size in bytes of the message set for this partition |
MessageSet | The message data fetched from this partition, in the format described above. |
ThrottleTime | Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.) |
Fetch Response v1 only contains message format v0.
Fetch Response v2 might either contain message format v0 or message format v1.
Possible Error Codes
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* REPLICA_NOT_AVAILABLE (9)
* UNKNOWN (-1)
...
Per the comment on
Jira | ||||
---|---|---|---|---|
|
Code Block |
---|
v0 and v1(since 0.8.2):
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
ConsumerGroup => string
TopicName => string
Partition => int32
|
Offset Fetch Response
Code Block |
---|
v0 and v1(since 0.8.2):
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
TopicName => string
Partition => int32
Offset => int64
Metadata => string
ErrorCode => int16
|
Note that if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1.
There is no format difference between Offset Fetch Request v0 and v1. Functionality wise, Offset Fetch Request v0 will fetch offset from zookeeper, Offset Fetch Request v1 will fetch offset from Kafka.
Possible Error Codes
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* GROUP_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
...