Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Move the control bet to the batch level attributes.

 

Status
colourRed
titleNotice
 An initial version of this page has been added to the official documentation here. It still needs a lot of work. However, because it is generated from the code may be more up to date in some cases. Contributions to improve this documentation is welcome and encouraged. Some tasks are tracked here:
Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-3360
 

 

Table of Contents

Introduction

This document covers the protocol implemented in Kafka 0.8 and beyond. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them to implement a client. This document assumes you understand the basic design and terminology described here.

The protocol used in 0.7 and earlier is similar to this, but we chose to make a one time (

Table of Contents

Introduction

This document covers the protocol implemented in Kafka 0.8 and beyond. It is meant to give a readable guide to the protocol that covers the available requests, their binary format, and the proper way to make use of them to implement a client. This document assumes you understand the basic design and terminology described here.

The protocol used in 0.7 and earlier is similar to this, but we chose to make a one time (we hope) break in compatibility to be able to clean up cruft and generalize things.

...

Field

Description

ApiKey

This is a numeric id for the API being invoked (i.e. is it a metadata request, a produce request, a fetch request, etc).

ApiVersion

This is a numeric version number for this api. We version each API and this version number allows the server to properly interpret the request as the protocol evolves. Responses will always be in the format corresponding to the request version. Currently the supported version for all APIs is 0.

CorrelationId

This is a user-supplied integer. It will be passed back in the response by the server, unmodified. It is useful for matching request and response between the client and server.

ClientId

This is a user supplied identifier for the client application. The user can use any identifier they like and it will be used when logging errors, monitoring aggregates, etc. For example, one might want to monitor not just the requests per second overall, but the number coming from each client application (each of which could reside on multiple servers). This id acts as a logical grouping across all requests from a particular client.

...

Code Block
MessageSet => [Offset MessageSize Message]
  Offset => int64
  MessageSize => int32

Message format

Code Block
v0
Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => int8
  Attributes => int8
  TimestampKey => int64bytes
  KeyValue => bytes
 
v1 (supported since 0.10.0)
Message => Crc MagicByte Attributes Key Value
  Crc => int32
  MagicByte => bytes
int8
  Attributes => int8
  Timestamp => int64
  Key => bytes
  Value => bytes
  

Field

Description

Offset

This

  

Field

Description

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending non compressed messages, it can set the offsets to anything. When the producer is sending compressed messages, to avoid server side recompression, each compressed message should have offset starting from 0 and increasing by one for each inner message in the compressed message. (see more details about compressed messages in Kafka below)

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 1.

Attributes

This byte holds metadata attributes about the message.

The lowest 3 bits contain the compression codec used for the message.

The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0)

All other bits should be set to 0.

TimestampThis is the timestamp of the message. The timestamp type is indicated in the attributes. Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).

Key

The key is an optional message key that was used for partition assignment. The key can be null.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

Compression

 Kafka supports compressing messages for additional efficiency, however this is more complex than just compressing a raw message. Because individual messages may not have sufficient redundancy to enable good compression ratios, compressed messages must be sent in special batches (although you may use a batch of one if you truly wish to compress a message on its own). The messages to be sent are wrapped (uncompressed) in a MessageSet structure, which is then compressed and stored in the Value field of a single "Message" with the appropriate compression codec set. The receiving system parses the actual MessageSet from the decompressed value. The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

Kafka currently supports two compression codecs with the following codec numbers:

Compression

Codec

None

0

GZIP

1

Snappy

2

The APIs

This section gives details on each of the individual APIs, their usage, their binary format, and the meaning of their fields.

Metadata API

This API answers the following questions:

  • What topics exist?
  • How many partitions does each topic have?
  • Which broker is currently the leader for each partition?
  • What is the host and port for each of these brokers?

This is the only request that can be addressed to any broker in the cluster.

Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.

The metadata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.

Note: If "auto.create.topics.enable" is set in the broker configuration, a topic metadata request will create the topic with the default replication factor and number of partitions. 

Topic Metadata Request
Code Block
TopicMetadataRequest => [TopicName]
  TopicName => string

Field

Description

TopicName

The topics to produce metadata for. If empty the request will yield metadata for all topics.

Metadata Response

The response contains metadata for each partition, with partitions grouped together by topic. This metadata refers to brokers by their broker id. The brokers each have a host and port.

In Kafka 0.11, the structure of the 'MessageSet' and 'Message' were significantly changed. Not only were new fields added to support new features like exactly once semantics and record headers, but the recursive nature of the previous versions of the message format was eliminated in favor of a flat structure. A 'MessageSet' is now called a 'RecordBatch', which contains one or more 'Records' (and not 'Messages'). When compression is enabled, the RecordBatch header remains uncompressed, but the Records are compressed together. Further, multiple fields in the 'Record' are varint encoded, which leads to significant space savings for larger batches. 

The new message format has a Magic value of 2. Its structure is as follows:

Code Block
RecordBatch =>
  FirstOffset => int64
  Length => int32
  PartitionLeaderEpoch => int32 
  Magic => int8  
  CRC => int32
  Attributes => int16
  LastOffsetDelta => int32 
  FirstTimestamp => int64 
  MaxTimestamp => int64 
  ProducerId => int64 
  ProducerEpoch => int16 
  FirstSequence => int32 
  Records => [Record]
 
Record => 
  Length => varint
  Attributes => int8
  TimestampDelta => varint
  OffsetDelta => varint
  KeyLen => varint
  Key => data
  ValueLen => varint
  Value => data
  Headers => [Header] 
 
Header => HeaderKey HeaderVal
  HeaderKeyLen => varint
  HeaderKey
Code Block
MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
  	NodeId => int32
  	Host => string
  	PortHeaderValueLen => int32varint
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  	TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
  	PartitionErrorCode => int16
  	PartitionId => int32
  	Leader => int32
  	Replicas => [int32]
  	Isr => [int32]
HeaderValue => data

The semantics of the newly added fields are described below: 

  

Field

Description

Leader

The node id for the kafka broker currently acting as leader for this partition. If no leader exists because we are in the middle of a leader election this id will be -1.

Replicas

The set of alive nodes that currently acts as slaves for the leader for this partition.

Isr

The set subset of the replicas that are "caught up" to the leader

Broker

The node id, hostname, and port information for a kafka broker

Possible Error Codes
* UnknownTopic (3)
* LeaderNotAvailable (5)
* InvalidTopic (17)
* TopicAuthorizationFailed (29)
Produce API

The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.

The produce API uses the generic message set format, but since no offset has been assigned to the messages at the time of the send the producer is free to fill in that field in any way it likes.

Produce Request
Code Block
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
  Partition => int32
  MessageSetSize => int32

Field

Description

RequiredAcks

This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).

Timeout

This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout.

TopicName

The topic that data is being published to.

Partition

The partition that data is being published to.

MessageSetSize

The size, in bytes, of the message set that follows.

MessageSet

A set of messages in the standard format described above.

Produce Response
Code Block
ProduceResponse => [TopicName [Partition ErrorCode Offset]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  Offset => int64
  Timestamp => int64

Field

Description

Topic

The topic this response entry corresponds to.

Partition

The partition this response entry corresponds to.

ErrorCode

The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be unavailable or maintained on a different host, while others may have successfully accepted the produce request.

Offset

The offset assigned to the first message in the message set appended to this partition.

Timestamp

If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set. All the messages in the message set have the same timestamp.

If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the produce request has been accepted by the broker if there is no error code returned.

Possible Error Codes: (TODO)

...

FirstOffset

Denotes the first offset in the RecordBatch. The 'offsetDelta' of each Record in the batch would be be computed relative to this FirstOffset. In particular, the offset of each Record in the Batch is its 'OffsetDelta' + 'FirstOffset'.

LastOffsetDeltaThe offset of the last message in the RecordBatch. This is used by the broker to ensure correct behavior even when Records within a batch are compacted out.

PartitionLeaderEpoch

Introduced with KIP-101, this is set by the broker upon receipt of a produce request and is used to ensure no loss of data when there are leader changes with log truncation. Client developers do not need to worry about setting this value.

FirstTimeStamp

The timestamp of the first Record in the batch. The timestamp of each Record in the RecordBatch is its 'TimestampDelta' + 'FirstTimestamp'.

RecordBatch Attributes

This byte holds metadata attributes about the message.

The lowest 3 bits contain the compression codec used for the message.

The fourth lowest bit represents the timestamp type. 0 stands for CreateTime and 1 stands for LogAppendTime. The producer should always set this bit to 0. (since 0.10.0)

The fifth lowest bit indicates whether the RecordBatch is part of a transaction or not. 0 indicates that the RecordBatch is not transactional, while 1 indicates that it is. (since 0.11.0.0).

The sixth lowest bit indicates whether the RecordBatch includes a control message. 1 indicates that the RecordBatch is contains a control message, 0 indicates that it doesn't. Control messages are used to enable transactions in Kafka and are generated by the broker. Clients should not return control batches (ie. those with this bit set) to applications. (since 0.11.0.0)

Record Attributes

Record level attributes are presently unused.

MaxTimestampThe timestamp of the last Record in the batch. This is used by the broker to ensure the correct behavior even when Records within the batch are compacted out.

ProducerId

Introduced in 0.11.0.0 for KIP-98, this is the broker assigned producerId received by the 'InitProducerId' request. Clients which want to support idempotent message delivery and transactions must set this field.

ProducerEpoch

Introduced in 0.11.0.0 for KIP-98, this is the broker assigned producerEpoch received by the 'InitProducerId' request. Clients which want to support idempotent message delivery and transactions must set this field.

FirstSequence

Introduced in 0.11.0.0 for KIP-98, this is the producer assigned sequence number which is used by the broker to deduplicate messages. Clients which want to support idempotent message delivery and transactions must set this field. The sequence number for each Record in the RecordBatch is its OffsetDelta + FirstSequence.

HeadersIntroduced in 0.11.0.0 for KIP-82, Kafka now supports application level record level headers. The Producer and Consumer APIS have been accordingly updated to write and read these headers.

 

Compression

Kafka supports compressing messages for additional efficiency, however this is more complex than just compressing a raw message. Because individual messages may not have sufficient redundancy to enable good compression ratios, compressed messages must be sent in special batches (although you may use a batch of one if you truly wish to compress a message on its own). The messages to be sent are wrapped (uncompressed) in a MessageSet structure, which is then compressed and stored in the Value field of a single "Message" with the appropriate compression codec set. The receiving system parses the actual MessageSet from the decompressed value. The outer MessageSet should contain only one compressed "Message" (see KAFKA-1718 for details).

Kafka currently supports two compression codecs with the following codec numbers:

Compression

Codec

None

0

GZIP

1

Snappy

2

The APIs

This section gives details on each of the individual APIs, their usage, their binary format, and the meaning of their fields.

Metadata API

This API answers the following questions:

  • What topics exist?
  • How many partitions does each topic have?
  • Which broker is currently the leader for each partition?
  • What is the host and port for each of these brokers?

This is the only request that can be addressed to any broker in the cluster.

Since there may be many topics the client can give an optional list of topic names in order to only return metadata for a subset of topics.

The metadata returned is at the partition level, but grouped together by topic for convenience and to avoid redundancy. For each partition the metadata contains the information for the leader as well as for all the replicas and the list of replicas that are currently in-sync.

Note: If "auto.create.topics.enable" is set in the broker configuration, a topic metadata request will create the topic with the default replication factor and number of partitions. 

Topic Metadata Request
Code Block
TopicMetadataRequest => [TopicName]
  TopicName => string

Field

Description

TopicName

The topics to produce metadata for. If empty the request will yield metadata for all topics.

Metadata Response

The response contains metadata for each partition, with partitions grouped together by topic. This metadata refers to brokers by their broker id. The brokers each have a host and port.

Code Block
MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
  	NodeId => int32
  	Host => string
  	Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
  	TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
  	PartitionErrorCode => int16
  	PartitionId => int32
  	Leader => int32
  	Replicas => [int32]
  	Isr => [int32]

Field

Description

Leader

The node id for the kafka broker currently acting as leader for this partition. If no leader exists because we are in the middle of a leader election this id will be -1.

Replicas

The set of alive nodes that currently acts as slaves for the leader for this partition.

Isr

The set subset of the replicas that are "caught up" to the leader

Broker

The node id, hostname, and port information for a kafka broker

Possible Error Codes
* UnknownTopic (3)
* LeaderNotAvailable (5)
* InvalidTopic (17)
* TopicAuthorizationFailed (29)
Produce API

The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.

The produce API uses the generic message set format, but since no offset has been assigned to the messages at the time of the send the producer is free to fill in that field in any way it likes.

Produce Request
Code Block
v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
  RequiredAcks => int16
  Timeout => int32
Fetch API

The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages. In general, the return messages will have offsets larger than or equal to the starting offset. However, with compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset. The number of such messages is typically small and the caller is responsible for filtering out those messages.

Fetch requests follow a long poll model so they can be made to block for a period of time if sufficient data is not immediately available.

As an optimization the server is allowed to return a partial message at the end of the message set. Clients should handle this case.

One thing to note is that the fetch API requires specifying the partition to consume from. The question is how should a consumer know what partitions to consume from? In particular how can you balance the partitions over a set of consumers acting as a group so that each consumer gets a subset of partitions. We have done this assignment dynamically using zookeeper for the scala and java client. The downside of this approach is that it requires a fairly fat client and a zookeeper connection. We haven't yet created a Kafka API to allow this functionality to be moved to the server side and accessed more conveniently. A simple consumer client can be implemented by simply requiring that the partitions be specified in config, though this will not allow dynamic reassignment of partitions should that consumer fail. We hope to address this gap in the next major release.

Fetch Request
Code Block
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId => int32
  MaxWaitTime => int32
  MinBytes => int32
  TopicName => string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32

Field

Description

ReplicaId

The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes.

MaxWaitTime

The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued.

MinBytes

This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

TopicName

The name of the topic.

Partition

The id of the partition the fetch is for.

FetchOffset

The offset to begin this fetch from.

MaxBytes

The maximum bytes to include in the message set for this partition. This helps bound the size of the response.

Fetch Response
Code Block
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32

Field

Description

TopicName

The name of the topic this response entry is for.

Partition

The id of the partition this response is for.

HighwaterMarkOffset

The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are.

MessageSetSize

The size in bytes of the message set for this partition

MessageSet

The message data fetched from this partition, in the format described above.

Possible Error Codes
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* REPLICA_NOT_AVAILABLE (9)
* UNKNOWN (-1)
 
 
Offset API (AKA ListOffset)

This API describes the valid offset range available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.

The response contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition.

We agree that this API is slightly funky.

Offset Request
Code Block
OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsetsMessageSetSize => int32

Field

Decription

Time

Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.

Offset Response
Code Block
OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64
Possible Error Codes
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* UNKNOWN (-1)
Offset Commit/Fetch API

These APIs allow for centralized management of offsets. Read more Offset Management. As per comments on KAFKA-993 these API calls are not fully functional in releases until Kafka 0.8.1.1. It will be available in the 0.8.2 release.

Group Coordinator Request

The offsets for a given consumer group are maintained by a specific broker called the group coordinator. i.e., a consumer needs to issue its offset commit and fetch requests to this specific broker. It can discover the current coordinator by issuing a group coordinator request.

Code Block
GroupCoordinatorRequest => GroupId
  GroupId => string
Group Coordinator Response

Produce Request on or after v1 indicates the client can parse the quota throttle time in the Produce Response.

Produce Request on or after v2 indicates the client can parse the timestamp field in the produce Response.

Field

Description

RequiredAcks

This field indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response.

Timeout

This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout.

TopicName

The topic that data is being published to.

Partition

The partition that data is being published to.

MessageSetSize

The size, in bytes, of the message set that follows.

MessageSet

A set of messages in the standard format described above.

Produce Response
Code Block
v0
ProduceResponse => [TopicName [Partition ErrorCode Offset]]
  TopicName => string
  Partition
Code Block
GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
  ErrorCode => int16
  CoordinatorId => int32
  CoordinatorHostErrorCode => stringint16
  CoordinatorPortOffset => int32
Possible Error Codes
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* GROUP_AUTHORIZATION_FAILED (30)
Offset Commit Request
Code Block
v0int64
 
v1 (supported in 0.89.10 or later)
OffsetCommitRequestProduceResponse => ConsumerGroupId [TopicName [Partition ErrorCode Offset Metadata]] ThrottleTime
  ConsumerGroupIdTopicName => string
  TopicNamePartition => stringint32
  PartitionErrorCode => int32int16
  Offset => int64
   MetadataThrottleTime => stringint32

 
v1v2 (supported in 0.810.20 or later)
OffsetCommitRequestProduceResponse => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition ErrorCode Offset TimeStamp MetadataTimestamp]] ThrottleTime
  ConsumerGroupIdTopicName => string
  ConsumerGroupGenerationIdPartition => int32
  ConsumerIdErrorCode => string
  TopicName => string
  Partition => int32int16
  Offset => int64
  TimeStampTimestamp => int64
  MetadataThrottleTime => string

v2 (supported in 0.8.3 or later)
OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]]
  ConsumerGroupId => string
  ConsumerGroupGenerationId => int32
  ConsumerId => string
  RetentionTime => int64
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string

In v0 and v1, the time stamp of each partition is defined as the commit time stamp, and the offset coordinator will retain the committed offset until its commit time stamp + offset retention time specified in the broker config; if the time stamp field is not set, brokers will set the commit time as the receive time before committing the offset, users can explicitly set the commit time stamp if they want to retain the committed offset longer on the broker than the configured offset retention time.

In v2, we removed the time stamp field but add a global retention time field (see KAFKA-1634 for details); brokers will then always set the commit time stamp as the receive time, but the committed offset can be retained until its commit time stamp + user specified retention time in the commit request. If the retention time is not set, the broker offset retention time will be used as default.

 

Offset Commit Response
Code Block
v0, v1 and v2:
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
Possible Error Codes
* OFFSET_METADATA_TOO_LARGE (12)
* GROUP_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* REBALANCE_IN_PROGRESS (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
Offset Fetch Request

Per the comment on

Jira
serverASF JIRA
keyKAFKA-1841
, v0 and v1 are identical on the wire, but v0 (supported in 0.8.1 or later) reads offsets from zookeeper, while v1 (supported in 0.8.2 or later) reads offsets from kafka.

Code Block
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
  ConsumerGroup => string
  TopicName => string
  Partition => int32
Offset Fetch Response
int32

Field

Description

Topic

The topic this response entry corresponds to.

Partition

The partition this response entry corresponds to.

ErrorCode

The error from this partition, if any. Errors are given on a per-partition basis because a given partition may be unavailable or maintained on a different host, while others may have successfully accepted the produce request.

Offset

The offset assigned to the first message in the message set appended to this partition.

Timestamp

If LogAppendTime is used for the topic, this is the timestamp assigned by the broker to the message set. All the messages in the message set have the same timestamp.

If CreateTime is used, this field is always -1. The producer can assume the timestamp of the messages in the produce request has been accepted by the broker if there is no error code returned.

Unit is milliseconds since beginning of the epoch (midnight Jan 1, 1970 (UTC)).

ThrottleTime

Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota).

Possible Error Codes: (TODO)


Fetch API

The fetch API is used to fetch a chunk of one or more logs for some topic-partitions. Logically one specifies the topics, partitions, and starting offset at which to begin the fetch and gets back a chunk of messages. In general, the return messages will have offsets larger than or equal to the starting offset. However, with compressed messages, it's possible for the returned messages to have offsets smaller than the starting offset. The number of such messages is typically small and the caller is responsible for filtering out those messages.

Fetch requests follow a long poll model so they can be made to block for a period of time if sufficient data is not immediately available.

As an optimization the server is allowed to return a partial message at the end of the message set. Clients should handle this case.

One thing to note is that the fetch API requires specifying the partition to consume from. The question is how should a consumer know what partitions to consume from? In particular how can you balance the partitions over a set of consumers acting as a group so that each consumer gets a subset of partitions. We have done this assignment dynamically using zookeeper for the scala and java client. The downside of this approach is that it requires a fairly fat client and a zookeeper connection. We haven't yet created a Kafka API to allow this functionality to be moved to the server side and accessed more conveniently. A simple consumer client can be implemented by simply requiring that the partitions be specified in config, though this will not allow dynamic reassignment of partitions should that consumer fail. We hope to address this gap in the next major release.

Fetch Request
Code Block
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
  ReplicaId
Code Block
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
  TopicName => string
  Partition => int32
  OffsetMaxWaitTime => int64int32
  MetadataMinBytes => stringint32
  ErrorCodeTopicName => int16

Note that if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1.

Possible Error Codes
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* GROUP_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
Group Membership API

These requests are used by clients to participate in a client group managed by Kafka. From a high level, each group in the cluster is assigned one the brokers (its group coordinator) to facilitate group management. Once the coordinator has been located (using the group coordinator request from above), group members can join the group and synchronize state, and then use heartbeats to stay active in the group. When the client shuts down, it uses a leave group request to deregister from the group. More detail on protocol semantics is outlined in Kafka Client-side Assignment Proposal.

The primary use case for the membership API is consumer groups, but the requests are intentionally generic to support other cases (e.g. Kafka Connect groups). The cost of this generality is that specific group semantics are pushed into the client. For example, the JoinGroup/SyncGroup requests defined below have no explicit fields supporting partition assignment for consumer groups. Instead, they contain generic byte arrays in which assignments can be embedded by the consumer client implementation.

But although this allows each client implementation to define its own embedded schema, compatibility with Kafka tooling requires clients to use the standard embedded schema used by the client shipped with Kafka. The consumer-groups.sh utility, for example, assumes this format to display partition assignments. We therefore recommend that clients should follow the same schema so that these tools will work for all client implementations.

Join Group Request

The join group request is used by a client to become a member of a group. When new members join an existing group, all previous members are required to rejoin by sending a new join group request. When a member first joins the group, the memberId will be empty (i.e. ""), but a rejoining member should use the same memberId from the previous generation. 

...

string
  Partition => int32
  FetchOffset => int64
  MaxBytes => int32

Field

Description

ReplicaId

The replica id indicates the node id of the replica initiating this request. Normal client consumers should always specify this as -1 as they have no node id. Other brokers set this to be their own node id. The value -2 is accepted to allow a non-broker to issue fetch requests as if it were a replica broker for debugging purposes.

MaxWaitTime

The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued.

MinBytes

This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding).

TopicName

The name of the topic.

Partition

The id of the partition the fetch is for.

FetchOffset

The offset to begin this fetch from.

MaxBytes

The maximum bytes to include in the message set for this partition. This helps bound the size of the response.

Fetch Response
Code Block
v0
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
 
v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
  ThrottleTime => int32
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32

Field

Description

ThrottleTimeDuration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.)

TopicName

The name of the topic this response entry is for.

Partition

The id of the partition this response is for.

HighwaterMarkOffset

The offset at the end of the log for this partition. This can be used by the client to determine how many messages behind the end of the log they are.

MessageSetSize

The size in bytes of the message set for this partition

MessageSet

The message data fetched from this partition, in the format described above.

Fetch Response v1 only contains message format v0.

Fetch Response v2 might either contain message format v0 or message format v1.

Possible Error Codes
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* REPLICA_NOT_AVAILABLE (9)
* UNKNOWN (-1)
 
 
Offset API (AKA ListOffset)

This API describes the valid offset range available for a set of topic-partitions. As with the produce and fetch APIs requests must be directed to the broker that is currently the leader for the partitions in question. This can be determined using the metadata API.

For version 0, the response contains the starting offset of each segment for the requested partition as well as the "log end offset" i.e. the offset of the next message that would be appended to the given partition. In version 1, which was initially supported in 0.10.1.0, Kafka supports a time index to search offsets by the timestamp used in messages, and a change was made to this API to support this. Note that this API is only supported for topics which have enabled the 0.10 message format, and the UNSUPPORTED_FOR_MESSAGE_FORMAT will be returned otherwise.

Offset Request
Code Block
// v0 
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32

 
// v1 (supported in 0.10.1.0 and later)
ListOffsetRequest => ReplicaId [TopicName [Partition Time]] 
  ReplicaId => int32 
  TopicName => string 
  Partition => int32 
  Time => int64

Field

Decription

Time

Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. This applies to all versions of the API. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.

Offset Response
Code Block
// v0
OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64

 
// v1
ListOffsetResponse => [TopicName [PartitionOffsets]] 
  PartitionOffsets => Partition ErrorCode Timestamp [Offset]
  Partition => int32 
  ErrorCode => int16 
  Timestamp => int64 
  Offset => int64
Possible Error Codes
* UNKNOWN_TOPIC_OR_PARTITION (3)
* NOT_LEADER_FOR_PARTITION (6)
* UNKNOWN (-1)
* UNSUPPORTED_FOR_MESSAGE_FORMAT (43)
 
Offset Commit/Fetch API

These APIs allow for centralized management of offsets. Read more Offset Management. As per comments on KAFKA-993 these API calls are not fully functional in releases until Kafka 0.8.1.1. It will be available in the 0.8.2 release.

Group Coordinator Request

The offsets for a given consumer group are maintained by a specific broker called the group coordinator. i.e., a consumer needs to issue its offset commit and fetch requests to this specific broker. It can discover the current coordinator by issuing a group coordinator request.

Code Block
GroupCoordinatorRequest => GroupId
  GroupId => string
Group Coordinator Response
Code Block
GroupCoordinatorResponse => ErrorCode CoordinatorId CoordinatorHost CoordinatorPort
  ErrorCode => int16
  CoordinatorId => int32
  CoordinatorHost => string
  CoordinatorPort => int32
Possible Error Codes
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* GROUP_AUTHORIZATION_FAILED (30)
Offset Commit Request
Code Block
v0 (supported in 0.8.1 or later)
OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]]
  ConsumerGroupId => string
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string

 
v1 (supported in 0.8.2 or later)
OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
  ConsumerGroupId => string
  ConsumerGroupGenerationId => int32
  ConsumerId => string
  TopicName => string
  Partition => int32
  Offset => int64
  TimeStamp => int64
  Metadata => string

v2 (supported in 0.9.0 or later)
OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]]
  ConsumerGroupId => string
  ConsumerGroupGenerationId => int32
  ConsumerId => string
  RetentionTime => int64
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string

In v0 and v1, the time stamp of each partition is defined as the commit time stamp, and the offset coordinator will retain the committed offset until its commit time stamp + offset retention time specified in the broker config; if the time stamp field is not set, brokers will set the commit time as the receive time before committing the offset, users can explicitly set the commit time stamp if they want to retain the committed offset longer on the broker than the configured offset retention time.

In v2, we removed the time stamp field but add a global retention time field (see KAFKA-1634 for details); brokers will then always set the commit time stamp as the receive time, but the committed offset can be retained until its commit time stamp + user specified retention time in the commit request. If the retention time is not set (-1), the broker offset retention time will be used as default.

Note that when this API is used for a "simple consumer," which is not part of a consumer group, then the generationId must be set to -1 and the memberId must be empty (not null). Additionally, if there is an active consumer group with the same groupId, then the commit will be rejected (typically with an UNKNOWN_MEMBER_ID or ILLEGAL_GENERATION error). 

Offset Commit Response
Code Block
v0, v1 and v2:
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
Possible Error Codes
* OFFSET_METADATA_TOO_LARGE (12)
* GROUP_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* REBALANCE_IN_PROGRESS (27)
* INVALID_COMMIT_OFFSET_SIZE (28)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
Offset Fetch Request

Per the comment on

Jira
serverASF JIRA
keyKAFKA-1841
, v0 and v1 are identical on the wire, but v0 (supported in 0.8.1 or later) reads offsets from zookeeper, while v1 (supported in 0.8.2 or later) reads offsets from kafka.

Code Block
v0 and v1 (supported in 0.8.2 or after):
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
  ConsumerGroup => string
  TopicName => string
  Partition => int32
Offset Fetch Response
Code Block
v0 and v1 (supported in 0.8.2 or after):
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
  TopicName => string
  Partition => int32
  Offset => int64
  Metadata => string
  ErrorCode => int16

Note that if there is no offset associated with a topic-partition under that consumer group the broker does not set an error code (since it is not really an error), but returns empty metadata and sets the offset field to -1.

There is no format difference between Offset Fetch Request v0 and v1. Functionality wise, Offset Fetch Request v0 will fetch offset from zookeeper, Offset Fetch Request v1 will fetch offset from Kafka.

Possible Error Codes
* UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0
* GROUP_LOAD_IN_PROGRESS (14)
* NOT_COORDINATOR_FOR_GROUP (16)
* ILLEGAL_GENERATION (22)
* UNKNOWN_MEMBER_ID (25)
* TOPIC_AUTHORIZATION_FAILED (29)
* GROUP_AUTHORIZATION_FAILED (30)
Group Membership API

These requests are used by clients to participate in a client group managed by Kafka. From a high level, each group in the cluster is assigned one the brokers (its group coordinator) to facilitate group management. Once the coordinator has been located (using the group coordinator request from above), group members can join the group and synchronize state, and then use heartbeats to stay active in the group. When the client shuts down, it uses a leave group request to deregister from the group. More detail on protocol semantics is outlined in Kafka Client-side Assignment Proposal.

The primary use case for the membership API is consumer groups, but the requests are intentionally generic to support other cases (e.g. Kafka Connect groups). The cost of this generality is that specific group semantics are pushed into the client. For example, the JoinGroup/SyncGroup requests defined below have no explicit fields supporting partition assignment for consumer groups. Instead, they contain generic byte arrays in which assignments can be embedded by the consumer client implementation.

But although this allows each client implementation to define its own embedded schema, compatibility with Kafka tooling requires clients to use the standard embedded schema used by the client shipped with Kafka. The consumer-groups.sh utility, for example, assumes this format to display partition assignments. We therefore recommend that clients should follow the same schema so that these tools will work for all client implementations.

Join Group Request

The join group request is used by a client to become a member of a group. When new members join an existing group, all previous members are required to rejoin by sending a new join group request. When a member first joins the group, the memberId will be empty (i.e. ""), but a rejoining member should use the same memberId from the previous generation. 

Code Block
v0 supported in 0.9.0.0 and greater
JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols
  GroupId => string
  SessionTimeout => int32
  MemberId => string
  ProtocolType => string
  GroupProtocols => [ProtocolName ProtocolMetadata]
    ProtocolName => string
    ProtocolMetadata => bytes
 
v1 supported in 0.10.1.0 and greater
JoinGroupRequest => GroupId SessionTimeout MemberId ProtocolType GroupProtocols
  GroupId => string
  SessionTimeout => int32
  RebalanceTimeout => int32
  MemberId => string
  ProtocolType => string
  GroupProtocols => [ProtocolName ProtocolMetadata]
    ProtocolName => string
    ProtocolMetadata => bytes

The SessionTimeout field is used to indicate client liveness. If the coordinator does not receive at least one heartbeat (see below) before expiration of the session timeout, then the member will be removed from the group. Prior to version 0.10.1, the session timeout was also used as the timeout to complete a needed rebalance. Once the coordinator begins rebalancing, each member in the group has up to the session timeout in order to send a new JoinGroup request. If they fail to do so, they will be removed from the group. In 0.10.1, a new version of the JoinGroup request was created with a separate RebalanceTimeout field. Once a rebalance begins, each client has up to this duration to rejoin, but note that if the session timeout is lower than the rebalance timeout, the client must still continue to send heartbeats.

The ProtocolType field defines the embedded protocol that the group implements. The group coordinator ensures that all members in the group support the same protocol type. The meaning of the protocol name and metadata contained in the GroupProtocols field depends on the protocol type. Note that the join group request allows for multiple protocol/metadata pairs. This enables rolling upgrades without downtime. The coordinator chooses a single protocol which all members support. The upgraded member includes both the new version and the old version of the protocol. Once all members have upgraded, the coordinator will choose whichever protocol is listed first in the GroupProtocols array.

...

Upon every completion of the join group phase, the coordinator increments a GenerationId for the group. This is returned as a field in the response to each member, and is sent in heartbeats and offset commit requests. When the coordinator rebalances a group, the coordinator will send an error code indicating that the member needs to rejoin. If the member does not rejoin before a rebalance completes, then it will have an old generationId, which will cause ILLEGAL_GENERATION errors when included in new requests.

Code Block
v0 and v1 supported in 0.9.0 and greater
JoinGroupResponse => ErrorCode GenerationId GroupProtocol LeaderId MemberId Members
  ErrorCode => int16
  GenerationId => int32
  GroupProtocol => string
  LeaderId => string
  MemberId => string
  Members => [MemberId MemberMetadata]
    MemberId => string
    MemberMetadata => bytes

...

* GROUP_LOAD_IN_PROGRESS (14)
* GROUP_COORDINATOR_NOT_AVAILABLE (15)
* NOT_COORDINATOR_FOR_GROUP (16)
* AUTHORIZATION_FAILED (29)

Constants

Api Keys And Current Versions

The following are the numeric codes that the ApiKey in the request can take for each of the above request types.

...