This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under DiscussionAccepted
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg83115.html
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Table of Contents |
---|
Motivation
Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following. These periodic FetchRequests must enumerate all the partitions which the follower is interested in. The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.
...
We can solve the scalability and latency problems discussed above by creating "incremental" fetch requests and responses that only include information about what has changed. In order to do this, we need to introduce the concept of "fetch sessions."
...
The Fetch Session includes:
- A randomly generated 6432-bit session ID which is unique on the leader
- The client ID
- The numeric follower ID, if this fetch session belongs to a Kafka broker
- The client ID string, if this fetch session belongs to a Kafka consumer
- For Cached data about each partition which the fetcher is interested in:
. - The privileged bit
- The topic and partition ID which uniquely identify the partition within Kafka
- The last fetch offset
- The maximum number of bytes to fetch from this partition The last dirty epoch
- The time when the fetch session was last used
...
Fetch Session ID
The fetch session ID is a randomly generated 6432-bit session ID. It is a unique, immutable identifier for the fetch session. Note that the fetch session ID may not be globally unique (although it's very likely to be so, since the space of 64 bit numbers is very large.) It simply has to be unique on the leader.
Since the ID is randomly chosen, it cannot leak information to unprivileged clients. It is also very hard for a malicious client to guess the fetch session ID. (Of course, there are other defenses in place against malicious clients, but the randomness of the ID provides defense in depth.)
2. Client ID
The client ID describes who created this fetch session. Sessions may be created by both brokers and Kafka clients.
It is useful to retain this ID so that we can include it in log messages, for greater debuggability. It is also useful for cache management (more about that later.)
...
Fetch Epoch
The fetch epoch is a monotonically incrementing 6432-bit counter. When the leader receives a fetch request with epoch N + 1, it knows that the data it sent back for the fetch request with epoch N was successfully processed by the follower.
Epoch 0 is considered to be the full fetch request, which establishes the incremental fetch parameters. Incremental fetch requests always have an epoch which is 1 or greater (but less than Long.MAX_VALUE).
4. Per-Partition Data
The FetchSession maintains information about a specific set of relevant partitions. Note that the set of relevant partitions is established when the FetchSession is created. It cannot be changed later.
For each partition, the last fetch offset is the latest offset which the fetcher has requested. This is taken directly from the incremental FetchRequest RPC. It is not updated based on the partition data that we send back to the fetcher.
We also track the maximum number of bytes to send back to the fetcher. This value is taken directly from the last FetchRequest in the FetchSession which mentioned the partition.
A partition is considered "dirty" if it has changed and needs to be resent. The partition becomes dirty when:
- The LogCleaner deletes messages, and this changes the log start offset of the partition on the leader., or
- The leader advances the high water mark, or
- The leader advances the last stable offset, or
- The leader changes the aborted transaction list for the partition
To mark the partition as dirty, we set lastDirtyEpoch to the number of the upcoming fetch epoch.
...
After processing request N, the broker expects to receive request N+1.
The sequence number is always greater than 0. After reaching MAX_INT, it wraps around to 1.
Cached data about each partition
If the fetch session supports incremental fetches, the FetchSession will maintain information about each partition in the incremental fetch.
For each partition, we maintain:
- The topic name
- The partition index
- The maximum number of bytes to fetch from this partition
- The fetch offset
- The high water mark
- The fetcher log start offset
- The leader log start offset
Topic name and partition index come from the TopicPartition.
maxBytes, fetchOffset, and fetcherLogStartOffset come from the latest FetchRequest in which the partition appeared.
highWatermark and localLogStartOffset come from the leader.
The leader uses this cached information to decide which partitions to include in the FetchResponse. Whenever any of these elements change, or if there is new data available for a partition, the partition will be included.
Privileged bit
The privileged bit is set if the fetch session was created by a follower. It is cleared if the fetch session was created by a regular consumer.
This is retained in order to prioritize followers over consumers, when resources are low. See the section on fetch session caching for details.
The time when the fetch session was last used
This is the time in wall-clock milliseconds when the fetch session was last used. This is used to expire fetch sessions after they have been inactive. See the next section on fetch session caching for details.
Fetch Session Caching
Because fetch sessions use memory on the leader, we want to limit the amount of them that we have at any given time. Therefore, the fetch session cache has each broker will create only a limited number of slots.
Each broker in the cluster gets its own set of cache slots. It makes sense to reserve dedicated cache slots for brokers, because we know that they will be fetching lots of partitions. They should get a lot of benefit out of incremental fetch requests. The reason for giving each broker more than one cache slot is because brokers may have several fetcher threads which operate in parallel. In this scenario, each fetcher thread should get its own FetchSession, following its own disjoint set of partitions.
The Kafka clients all share a common pool of cache slots. Cache slots will be allocated on a first-come, first-serve basis. Since clients which fetch many partitions will get the most benefit out of incremental fetch requests, the server may refuse to create a FetchSession unless the client requests more than a configurable number of partitions. This will allow clients which watch many partitions, like MirrorMaker, to get a cache slot even if there are many clients.
Fetch Sessions become eligible for eviction if they are not used within a configurable time period. Every time the fetch session is successfully used, the timer gets reset. In general, the time period can be fairly short-- just a minute or two. The time period can be relatively short because most clients send fetch requests frequently. This is certainly true for followers loading new messages from the leader.
If users have Kafka clients which consume many partitions, but take a long time between fetch requests, they may want to increase the timeout. Alternately, they can bump up the minimum FetchSession size, so that fewer clients are competing for FetchSesssions.
...
incremental fetch sessions.
There is one new public configurations for fetch session caching:
- max.incremental.fetch.session.cache.slots, which set the number of incremental fetch session cache slots on the broker. Default value: 1,000
There is one new constant for fetch session caching:
- min.incremental.fetch.session.eviction.ms, which sets the minimum amount of time we will wait before evicting an incremental fetch session from the cache. Value: 120,000
When the server gets a new request to create an incremental fetch session, it will compare the proposed new session with its existing sessions. The new session will evict an existing session if and only if:
- The new session belongs to a follower, and the existing session belongs to a regular consumer, OR
- The existing session has been inactive for more than min.incremental.fetch.session.eviction.ms, OR
- The existing session has existed for more than min.incremental.fetch.session.eviction.ms, AND the new session has more partitions
This accomplishes a few different goals:
- Followers get priority over consumers
- Inactive session get replaced over time
- Bigger requests, which benefit more from being incremental, are prioritized
- Cache thrashing is limited, avoiding expensive session re-establishment when there are more fetchers than cache slots.
Public Interface Changes
New Error Codes
There is a new error code, InvalidFetchSession. FetchSessionIdNotFound: The server responds with this error code when it is given an the client request refers to a fetch session ID that is no longer validthat the server does not know about. It may also respond This may happen if there was a client error, or if the fetch session was evicted by the server.
InvalidFetchSessionEpochException. The server responds with this error code if when the fetch session epoch is not valid for the given fetch sessionof a request is different than what it expected.
FetchRequest Changes
There are several changes to the FetchRequest API.
The FetchResponse now contains a top-level error code. This error code is set to indicate that the request as a whole cannot be processed. It will be set to InvalidFetchSession if the fetch session information in the request was invalid. When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.
The FetchRequest now contains a fetch session ID and fetch session epoch, to be used as follows:
Goal | Fetch Session ID | Fetch Session Epoch |
---|---|---|
Make a full fetch request | 0 | Long.MAX_VALUE |
Make a full fetch request and request a new incremental fetch session | 0 | 0 |
Make an incremental fetch request | The incremental fetch session ID | The incremental fetch session epoch |
The FetchResponse now contains a fetch session ID.
Request Type | Response Fetch Session ID | Meaning |
---|---|---|
Full fetch request | 0 | This is a full fetch response. No incremental fetch session was created. |
Full fetch request | A non-zero fetch session ID | This is a full fetch response. An incremental fetch session was created with the given ID. |
Incremental fetch request | The fetch session ID | This is an incremental fetch response for the given incremental session ID. |
The response to a full fetch request is always a full fetch response. Similarly, the response to an incremental fetch request is always an incremental fetch response.
Full FetchRequests and full FetchResponses will contain information about all partitions. This is unchanged from the current behavior.
Incremental FetchRequests will only contain information about partitions which have changed on the follower. If a partition has not changed, it will not be included. Possible changes include:
- Needing to fetch from a new offset in the partition
- Changing the maximum number of bytes which should be fetched from the partition
Note that the existing system by which the leader becomes aware of changes to the follower's fetch position is not changed. Every time the follower wants to update the fetch position, it must include the partition in the next incremental request. Also note that if no partitions have changed, the next incremental FetchRequest will not contain any partitions at all.
Similarly, incremental fetch responses from the server will only contain information about partitions for which:
...
Fetch Session ID
A 32-bit number which identifies the current fetch session. If this is set to 0, there is no current fetch session.
Fetch Session Epoch
A 32-bit number which identifies the current fetch session epoch. Valid session epochs are always positive-- they are never 0 or negative numbers.
The fetch session epoch is incremented by one for each fetch request that we send. Once it reaches MAX_INT, the next epoch is 1.
The fetch epoch keeps the state on the leader and the follower synchronized. It ensures that if a message is duplicated or lost, the server will always notice. It is also used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique.
FetchRequest Metadata meaning
Request SessionId | Request SessionEpoch | Meaning |
---|---|---|
0 | -1 | Make a full FetchRequest that does not use or create a session. This is the session ID used by pre-KIP-227 FetchRequests. |
0 | 0 | Make a full FetchRequest. Create a new incremental fetch session if possible. If a new fetch session is created, it will start at epoch 1. |
$ID | 0 | Close the incremental fetch session identified by $ID. Make a full FetchRequest. Create a new incremental fetch session if possible. If a new fetch session is created, it will start at epoch 1. |
$ID | $EPOCH | If the ID and EPOCH are correct, make an incremental fetch request. |
$ID | -1 | Close the incremental fetch session identified by $ID. Make a full FetchRequest. |
Incremental Fetch Requests
Incremental fetch requests have a positive fetch session ID.
A partition is only included in an incremental FetchRequest if:
- The client wants to notify the broker about a change to the partition's maxBytes, fetchOffset, or logStart
- The partition was not included in the incremental fetch session before, but the client wants to add it.
- The partition is in the incremental fetch session, but the client wants to remove it.
If the client doesn't want to change anything, the client does not need to include any partitions in the request at all.
If the client wants to remove a partition, the client will add the partition to the removed_partition list in the relevant removed_topics entry.
Schema
...
FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic] [removed_topic]
max_wait_time => INT32
replica_id => INT32
min_bytes => INT32
isolation_level => INT8
fetch_session_id => INT64INT32
fetch_session_epoch => INT64INT32
topic => topic_name [partition]
topic_name => STRING
partition => partition_id fetch_offset start_offset max_bytes
partition_id => STRINGINT32
fetch_offset => INT64
start_offset => INT64
max_bytes => INT32
removed_topic => removed_topic_name [removed_partition_id]
removed_topic_name => STRING
removed_partition_id => INT32
FetchResponse Changes
Top-level error code
Per-partition error codes are no longer sufficient to handle all response errors. For example, when an incremental fetch session encounters an FetchSessionIdNotFoundException, we do not know which partitions the client expected to fetch. Therefore, the FetchResponse now contains a top-level error code. This error code is set to indicate that the request as a whole cannot be processed.
When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.
Fetch Session ID
The FetchResponse now contains a 32-bit fetch session ID.
FetchResponse Metadata meaning
Request SessionId | Meaning |
---|---|
0 | No fetch session was created. |
$ID | The next request can be an incremental fetch request with the given $ID. |
Incremental Fetch Responses
A partition is only included in an incremental FetchResponse if:
- The broker wants to notify the client about a change to the partition's highWatermark or broker logStartOffset
- There is new data available for a partition
If the broker has no new information to report, it does not need to include any partitions in the response at all.
The format of the partition data within FetchResponse is unchanged.
Handling Partition Size Limits in Incremental Fetch Responses
Sometimes, the per-fetch-request limit is too small to allow us to return information about every partition. In those cases, we will limit the number of partitions that we return information about, to avoid exceeding the per-request maximum. (As specified in KIP-74, the response will always return at least one message, though.)
If we always returned partition information in the same order, we might end up "starving" the partitions which came near the end of the order. With full fetch requests, the client can rotate the order in which it requests partitions in order to avoid this problem. However, incremental fetch requests need not explicitly specify all the partitions. Indeed, an incremental fetch request may contain no partitions at all.
In order to solve the starvation problem, the server must rotate the order in which it returns partition information. The server does this by maintaining a linked list of all partitions in the fetch session. When data is returned for a partition, that partition is moved to the end of the list. This ensures that we eventually return data about all partitions for which data is available.
Schema
FetchResponse => throttle_time_ms error_code error_string fetch_session_id [topic]
throttle_time_ms => INT32
error_code => INT16
error_string => STRING
fetch_session_id => INT64INT32
topic => topic_name [partition]
topic_name => STRING
partition => partition_header records
partition_header => partition_id error_code high_watermark last_stable_offset log_start_offset [aborted_transaction]
partition_id => INT32
error_code => INT16
high_watermark => INT64
last_stable_offset => INT64
log_start_offset => INT64
aborted_transaction => producer_key first_offset
producer_key => INT64
first first_offset => INT64
records => RECORDS
New
...
Metrics
The following new metrics will be added to track cache consumption:
NumIncrementalFetchSessions: Tracks the number of incremental fetch sessions which exist.
NumIncrementalFetchPartitionsCached: Tracks the total number of partitions cached by incremental fetch sessions.
IncrementalFetchSessionEvictionsPerSec: Tracks the number of incremental fetch sessions that were evicted from the cache per second. This metric is not increased when a client closes its own incremental fetch session
min.fetch.session.size
This is a broker configuration which sets the minimum size of a fetch session. The broker will not create incremental fetch sessions for fetch requests smaller than this.
min.fetch.session.eviction.timeout.ms
This is a broker configuration. Fetch sessions which are not used within this timeout are eligible for eviction.
request.incremental.fetch.session
This is a configuration for brokers or Kafka consumers. If it is set to true, we will request an incremental fetch session when fetching partition data from other brokers. If it is set to false, we will not. The default is true.
Compatibility, Deprecation, and Migration Plan
...