...
The Kafka protocol is fairly simple, there are only four six client requests APIs.
- Metadata - Describes the currently available brokers, their host and port information, and gives information about which broker hosts which partitions.
- Send - Send messages to a broker
- Fetch - Fetch messages from a broker, one which fetches data, one which gets cluster metadata, and one which gets offset information about a topic.
- Offsets - Get information about the available offsets for a given topic partition.
- Offset Commit - Commit a set of offsets for a consumer group
- Offset Fetch - Fetch a set of offsets for a consumer group
Each of these will be described in detail below.
...
Code Block |
---|
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
|
Field | Description |
---|---|
ApiKey | This is a numeric id for the API being invoked (i.e. is it a metadata request, a produce request, a fetch request, etc). |
ApiVersion | This is a numeric version number for this api. We version each API and this version number allows the server to properly interpret the request as the protocol evolves. Responses will always be in the format corresponding to the request version. Currently the supported version for all APIs is 0. |
CorrelationId | This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server. |
ClientId | This is a user supplied identifier for the client application. The user can use any identifier they like and it will be used when logging errors, monitoring aggregates, etc. For example, one might want to monitor not just the requests per second overall, but the number coming from each client application (each of which could reside on multiple servers). This id acts as a logical grouping across all requests from a particular client. |
The various request and response messages will be described below.
...
Responses
...
Here is a mapping of the current ApiKeys
API | ApiKey |
---|---|
ProduceKey | 0 |
FetchKey | 1 |
OffsetsKey | 2 |
MetadataKey | 3 |
LeaderAndIsrKey | 4 |
StopReplicaKey | 5 |
OffsetCommitKey | 6 |
OffsetFetchKey | 7 |
...
Responses
...
Code Block |
---|
Response => CorrelationId ResponseMessage
CorrelationId => |
Code Block |
Response => CorrelationId ResponseMessage CorrelationId => int32 ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse |
Field | Description |
---|---|
CorrelationId | The server passes back whatever integer the client supplied as the correlation in the request. |
...
Code Block |
---|
OffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64 |
Constants
Api Keys
Offset Commit/Fetch API
These APIs allow for centralized management of offsets. Read more Offset Management
...
Offset Commit Request
...
Code Block |
---|
OffsetCommitRequest => ConsumerGroup [TopicName [Partition Offset Metadata]]
ConsumerGroup => string
TopicName => string
Partition => int32
Offset => int64
Metadata => string
|
...
Offset Commit Response
...
Code Block |
---|
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
TopicName => string
Partition => int32
ErrorCode => int16
|
...
Offset Fetch Request
...
Code Block |
---|
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
ConsumerGroup => string
TopicName => string
Partition => int32
|
...
Offset Fetch Response
...
Code Block |
---|
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
TopicName => string
Partition => int32
Offset => int64
Metadata => string
ErrorCode => int16
|
Constants
Api Keys
The following are the numeric codes that the ApiKey in the request can take for each of the above The following are the numeric codes that the ApiKey in the request can take for each of the above request types.
API name | ApiKey Value |
---|---|
ProduceRequest | 0 |
FetchRequest | 1 |
OffsetRequest | 2 |
MetadataRequest | 3 |
...
Error | Code | Description |
---|---|---|
NoError | 0 | No error--it worked! |
Unknown | -1 | An unexpected server error |
OffsetOutOfRange | 1 | The requested offset is outside the range of offsets maintained by the server for the given topic/partition. |
InvalidMessage | 2 | This indicates that a message contents does not match its CRC |
UnknownTopicOrPartition | 3 | This request is for a topic or partition that does not exist on this broker. |
InvalidMessageSize | 4 | The message has a negative size |
LeaderNotAvailable | 5 | This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes. |
NotLeaderForPartition | 6 | This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date. |
RequestTimedOut | 7 | This error is thrown if the request exceeds the user-specified time limit in the request. |
BrokerNotAvailable | 8 | This is not a client facing error and is used only internally by intra-cluster broker communication. |
ReplicaNotAvailable | 9 | What is the difference between this and LeaderNotAvailable? |
MessageSizeTooLarge | 10 | The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum. |
StaleControllerEpochCode | 11 | ??? |
OffsetMetadataTooLargeCode | 12 | If you specify a string larger than configured maximum for offset metadata |
Some Common Philosophical Questions
...