Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under DiscussionAccepted

Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg83115.html

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following.  These periodic FetchRequests must enumerate all the partitions which the follower is interested in.  The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.

...

  • max.incremental.fetch.session.cache.slots, which set the number of incremental fetch session cache slots on the broker.  Default value: 101,000

There is one new constant for fetch session caching:

...

The fetch epoch keeps the state on the leader and the follower synchronized.  It ensures that if a message is duplicated or lost, the server will always notice.  It is also used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique. 


FetchRequest Metadata meaning

Request

SessionId

Request

SessionEpoch

Meaning
0-1
0

Make a full FetchRequest that does not use or create a session.

This is the session ID used by pre-KIP-227 FetchRequests.

00

Make a full FetchRequest.

Create a new incremental fetch session if possible.  If a new fetch session is created, it will start at epoch 1.

$ID0

Close the incremental fetch session identified by $ID.

Make a full FetchRequest.

Create a new incremental fetch session if possible.  If a new fetch session is created, it will start at epoch 1.

$ID$EPOCHIf the ID and EPOCH are correct, make an incremental fetch request.
$ID-1

Close the incremental fetch session identified by $ID.

Make a full FetchRequest.

Incremental Fetch Requests

...

If the client wants to remove a partition, the client will set add the partition 's maxBytes to 0 in the requestto the removed_partition list in the relevant removed_topics entry.

Schema

FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic] [removed_topic]

  max_wait_time => INT32

  replica_id => INT32

...

  partition => partition_id fetch_offset start_offset max_bytes

  partition_id => STRINGINT32

  fetch_offset => INT64

  start_offset => INT64

  max_bytes => INT32.

  removed_topic => removed_topic_name [removed_partition_id]

  removed_topic_name => STRING

  removed_partition_id => INT32

FetchResponse Changes

Top-level error code

...

When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.

 


Fetch Session ID

The FetchResponse now contains a 32-bit fetch session ID.

FetchResponse Metadata meaning

Request

SessionId

Meaning
0No fetch session was created.
$ID

The next request can be an incremental fetch request with the given $ID.

Incremental Fetch Responses

...

The following new metrics will be added to track cache consumption:

TotalIncrementalFetchSessionsNumIncrementalFetchSessions: Tracks the number of incremental fetch sessions which exist.

NumIncrementalFetchPartitionsCached: Tracks the total number of partitions cached by incremental fetch sessions.

TotalIncrementalFetchSessionsEvictedIncrementalFetchSessionEvictionsPerSec: Tracks the number of incremental fetch sessions that were evicted from the cache per second.   This metric is not increased when a client closes its own incremental fetch sessionTotalIncrementalFetchPartitionsCached: Tracks the total number of partitions cached by incremental fetch sessions.

Compatibility, Deprecation, and Migration Plan

...