Status
Current state: Under DiscussionAccepted
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg83115.html
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following. These periodic FetchRequests must enumerate all the partitions which the follower is interested in. The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.
...
Because fetch sessions use memory on the leader, we want to limit the amount of them that we have at any given time. Therefore, each broker will create only a limited number of incremental fetch sessions.
There are two is one new public configurations for fetch session caching:
- max.incremental.fetch.session.cache.slots, which set the number of incremental fetch session cache slots on the broker. Default value: 1,000
There is one new constant for fetch session caching:
- min.incremental.fetch.session.eviction.ms, which sets the minimum amount of time we will wait before evicting an incremental fetch session from the cache. Value: 120,000
When the server gets a new request to create an incremental fetch session, it will compare the proposed new session with its existing sessions. The new session will evict an existing session if and only if:
...
The fetch epoch keeps the state on the leader and the follower synchronized. It ensures that if a message is duplicated or lost, the server will always notice. It is also used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique.
Fetch Type
The FetchType is an 8-bit number with the following values:
- 0: SESSIONLESS
- 1: FULL
- 2: INCREMENTAL
FetchRequest Metadata meaning
FetchRequest Metadata meaning
Request
FetchType
Request
SessionId
Request
Request SessionId | Request |
---|
SessionEpoch | Meaning |
---|
0 |
-1 |
Make a full FetchRequest that does not use or create a session. This is the |
session ID used by pre-KIP-227 FetchRequests. |
0 | 0 | Make a full FetchRequest. Create a new incremental fetch session if possible. If a new fetch session is created, it will start at epoch 1. |
$ID | 0 | Close the incremental fetch session identified by $ID. Make a full FetchRequest. Create a new incremental fetch session if possible. |
If a new fetch session is created, it will start at epoch 1. |
$ID | $EPOCH | If the ID and EPOCH are correct, make an incremental fetch request. |
$ID | -1 | Close the incremental fetch session identified by $ID. Make a full FetchRequest. |
Incremental Fetch Requests
Incremental fetch requests have FetchType set to INCREMENTALa positive fetch session ID.
A partition is only included in an incremental FetchRequest if:
...
If the client wants to remove a partition, the client will set add the partition 's maxBytes to 0 in the requestthe removed_partition list in the relevant removed_topics entry.
Schema
FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic] [removed_topic]
max_wait_time => INT32
replica_id => INT32
...
isolation_level => INT8
fetch_type => INT8 fetch_session_id => INT32
fetch_session_epoch => INT32
...
partition => partition_id fetch_offset start_offset max_bytes
partition_id => STRINGINT32
fetch_offset => INT64
start_offset => INT64
max_bytes => INT32.
FetchResponse Changes
Top-level error code
removed_topic => removed_topic_name [removed_partition_id]
removed_topic_name => STRING
removed_partition_id => INT32
FetchResponse Changes
Top-level error code
Per-partition error codes are Per-partition error codes are no longer sufficient to handle all response errors. For example, when an incremental fetch session encounters an FetchSessionIdNotFoundException, we do not know which partitions the client expected to fetch. Therefore, the FetchResponse now contains a top-level error code. This error code is set to indicate that the request as a whole cannot be processed.
When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.
Fetch Type
The FetchResponse now contains an 8-bit fetch type.
Fetch Session ID
The FetchResponse now contains a 32-bit fetch session ID.
...
The FetchResponse now contains a 32-bit fetch session epoch. This is the epoch which the server expects to see in the next fetch request.
FetchResponse Metadata meaning
Request FetchType | Request SessionId | Request SessionEpoch | Meaning |
---|---|---|---|
SESSIONLESS | ignored | ignored | This is a response to a SESSIONLESS fetch request. The broker also uses this FetchType when there was a fetch session error. The error will be described by the top-level response error field. |
FULL | 0 | 0 | No fetch session was created. |
INCREMENTAL | $ID | $EPOCH | The next request can be an incremental fetch request with the given $ID and $EPOCH. |
...
FetchResponse Metadata meaning
Request SessionId | Meaning |
---|---|
0 | No fetch session was created. |
$ID | The next request can be an incremental fetch request with the given $ID. |
Incremental Fetch Responses
...
FetchResponse => throttle_time_ms error_code error_string fetch_session_id [topic]
throttle_time_ms => INT32
error_code => INT16
error_string => STRING
fetch_type => INT8
]
throttle_time_ms => INT32
error_code => INT16 fetch_session_id => INT32
fetch_session_epoch id => INT32
topic => topic_name [partition]
...
The following new metrics will be added to track cache consumption:
TotalIncrementalFetchSessionsNumIncrementalFetchSessions: Tracks the number of incremental fetch sessions which exist.
NumIncrementalFetchPartitionsCached: Tracks the total number of partitions cached by incremental fetch sessions.
IncrementalFetchSessionEvictionsPerSecTotalIncrementalFetchSessionsEvicted: Tracks the number of incremental fetch sessions that were evicted from the cache per second. This metric is not increased when a client closes its own incremental fetch sessionTotalIncrementalFetchPartitionsCached: Tracks the total number of partitions cached by incremental fetch sessions.
Compatibility, Deprecation, and Migration Plan
...