Status
Current state: Under Discussion
Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg83115.html
JIRA: KAFKA-6254
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following. These periodic FetchRequests must enumerate all the partitions which the follower is interested in. The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.
The frequency at which the leader responds to the follower's fetch requests is controlled by several configuration tunables, including replica.fetch.wait.max.ms, replica.fetch.min.bytes, replica.fetch.max.bytes, and replica.fetch.response.max.bytes. Broadly speaking, the system can be tuned for lower latency, by having more frequent, smaller fetch responses, or for reduced system load, having having fewer, larger fetch responses.
There are two major inefficiencies in the current FetchRequest paradigm. The first one is that the set of partitions which the follower is interested in changes only rarely. Yet each FetchRequest must enumerate the full set of partitions which the follower is interested in. The second inefficiency is that even when nothing has changed in a partition since the previous FetchRequest, we must still send back metadata about that partition.
These inefficiencies are linearly proportional to the number of extant partitions in the system. So, for example, imagine a Kafka installation with 100,000 partitions, most of which receive new messages only rarely. The brokers in that system will still send back and forth extremely large FetchRequests and FetchResponses, even though there is very little actual message data being added per second. As the number of partitions grows, Kafka uses more and more network bandwidth to pass back and forth these messages.
When Kafka is tuned for lower latency, these inefficiencies become worse. If we double the number of FetchRequests sent per second, we should expect there to be more partitions which haven't changed within the reduced polling interval. And we are less able to amortize the high fixed cost of sending metadata for each partition in every FetchRequest and FetchResponse. This again results in Kafka using more of the available network bandwidth.
Proposed Changes
We can solve the scalability and latency problems discussed above by creating "incremental" fetch requests that only include information about what has changed. In order to do this, we need to introduce the concept of "fetch sessions."
Fetch Sessions
A fetch session encapsulates the state of an individual fetcher. This allows us to avoid resending this state as part of each fetch request.
The Fetch Session includes:
- A randomly generated 64-bit session ID which is unique on the leader
- The client ID
- The numeric follower ID, if this fetch session belongs to a Kafka broker
- The client ID string, if this fetch session belongs to a Kafka consumer
- The incremental fetch sequence number
- For each partition which the fetcher is interested in:
- The topic and partition ID which uniquely identify the partition within Kafka
- The last fetch offset
- The maximum number of bytes to fetch from this partition
- The last dirty sequence number
- The time when the fetch session was last used
1. Fetch Session ID
The fetch session ID is a randomly generated 64-bit session ID. It is a unique, immutable identifier for the fetch session. Note that the fetch session ID may not be globally unique (although it's very likely to be so, since the space of 64 bit numbers is very large.) It simply has to be unique on the leader.
Since the ID is randomly chosen, it cannot leak information to unprivileged clients. It is also very hard for a malicious client to guess the fetch session ID. (Of course, there are other defenses in place against malicious clients, but the randomness of the ID provides defense in depth.)
2. Client ID
The client ID describes who created this fetch session. Sessions may be created by both brokers and Kafka clients.
It is useful to retain this ID so that we can include it in log messages, for greater debuggability. It is also useful for cache management (more about that later.)
3. The Incremental Fetch Sequence Number
The incremental fetch sequence number is a monotonically incrementing 64-bit counter. When the leader receives a fetch request with sequence number N + 1, it knows that the data it sent back for the fetch request with sequence number N was successfully processed by the follower.
Incremental fetch requests always have a sequence number greater than 0.
4. Incremental Fetch Per-Partition Data
If the fetch session supports incremental fetches, the FetchSession will maintain information about each partition in the incremental fetch.
For each partition, the last fetch offset is the latest offset which the fetcher has requested. This is taken directly from the incremental FetchRequest RPC. It is not updated based on the partition data that we send back to the fetcher.
We also track the maximum number of bytes to send back to the fetcher. This value is taken directly from the last FetchRequest in the FetchSession which mentioned the partition.
A partition is considered "dirty" if it has changed and needs to be resent. The partition becomes dirty when:
- The LogCleaner deletes messages, and this changes the log start offset of the partition on the leader., or
- The leader advances the high water mark (which may also update the last stable offset), or
- New data is added, and the log end offset changes, or
- The leader changes the aborted transaction list for the partition
The last dirty sequence number for a partition is the highest sequence number for which there is information to send back about the partition. For example, let's say we have two partitions, P1 and P2. Let P1 have a last dirty sequence number of 100, and P2 have a last dirty sequence number of 101. An incremental fetch request with sequence number 100 will return information about both P1 and P2, whereas an incremental fetch request with sequence number 101 will return information only about P2.
The reason why this is necessary is to handle retransmissions. If the response to the fetch with sequence number 100 is lost, the follower will retransmit the request. The retransmitted request will also have sequence number 100, just like the initial request. We need to ensure that the response to the retransmission includes at least as many partitions as the response to the original request.
To mark the partition as dirty, we set lastDirtySequenceNumber to the number of the upcoming fetch epoch. The lastDirtySequenceNumber only ever grows; it never decreases. It is only ever modified when a partition becomes dirty, and the last update wins.
4. The time when the fetch session was last used
This is the time in wall-clock milliseconds when the fetch session was last used. This is used to expire fetch sessions after they have been inactive. See the next section for details.
Fetch Session Caching
Because fetch sessions use memory on the leader, we want to limit the amount of them that we have at any given time. Therefore, each broker will create only a limited number of incremental fetch sessions.
There are two new configurations for fetch session caching:
- max.incremental.fetch.session.cache.slots, which set the number of incremental fetch session cache slots on the broker
- min.incremental.fetch.session.eviction.ms, which sets the minimum amount of time we will wait before evicting an incremental fetch session from the cache.
When the server gets a new request to create an incremental fetch session, it will compare the proposed new session with its existing sessions. The new session will evict an existing session if and only if:
- The new session belongs to a follower, and the existing session belongs to a regular consumer, OR
- The existing session has been inactive for more than min.incremental.fetch.session.eviction.ms, OR
- The existing session has existed for more than min.incremental.fetch.session.eviction.ms, AND the new session has more partitions
This accomplishes a few different goals:
- Followers get priority over consumers
- Inactive session get replaced over time
- Bigger requests, which benefit more from being incremental, are prioritized
- Cache thrashing is limited, avoiding expensive session re-establishment when there are more fetchers than cache slots.
Public Interface Changes
New Error Codes
InvalidFetchSessionException. The server responds with this error code:
- When it is given an fetch session ID that is no longer valid, or which was never valid.
- When the given sequence number is not valid for the given session.
IncrementalFetchDenied: The server responds with this error code when the client attempts to make an incremental fetch, but the server does not want to support incremental fetches for the given client.
FetchRequest Changes
There are several changes to the FetchRequest API.
Fetch Session ID
The fetch session ID uniquely identifies the current fetcher.
If this is set to 0, the server may create a new fetch session and return its value in the FetchResponse.
If this is set to a non-zero value, it is the fetch session ID.
Fetch Type
There is now a fetch_type field with the following values:
FULL_FETCH_REQUEST(0): This is a full fetch request.
INCREMENTAL_FETCH_REQUEST(1): This is an incremental fetch request.
The response to a full fetch request is always a full fetch response. Similarly, the response to an incremental fetch request is always an incremental fetch response.
Incremental FetchRequests will only contain information about partitions which have changed on the follower. If a partition has not changed, it will not be included. Possible changes include:
- Needing to fetch from a new offset in the partition
- Changing the maximum number of bytes which should be fetched from the partition
- Changes in the logStartOffset of the partition on the follower
Note that the existing system by which the leader becomes aware of changes to the follower's fetch position is not changed. Every time the follower wants to update the fetch position, it must include the partition in the next incremental request. Also note that if no partitions have changed, the next incremental FetchRequest will not contain any partitions at all.
An incremental fetch request cannot be made with a 0 fetch session ID. An incremental fetch request should only be made if the server has returned INCREMENTAL_FETCH_OK in a previous full fetch request (see FetchResponse changes below)
Full FetchRequests and full FetchResponses will contain information about all partitions. This is unchanged from the current behavior. The client can alter the set of partitions in the fetch session by making a full fetch request with the designated session ID.
Incremental Fetch Sequence Number
The incremental fetch sequence number is a monotonically increasing counter. It is used for several purposes:
- It determines what order the partition data within an incremental fetch is returned in (see "handling partition size limits below")
- It is used to confirm that the client received the previous incremental fetch response. This allows us to avoid retransmitting responses that were already sent in the previous incremental fetch response (see "Incremental fetch per-partition data")
- It is used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique.
Schema
FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic]
max_wait_time => INT32
replica_id => INT32
min_bytes => INT32
isolation_level => INT8
fetch_type => INT8
fetch_session_id => INT64
fetch_session_sequence_number => INT64
topic => topic_name [partition]
topic_name => STRING
partition => partition_id fetch_offset start_offset max_bytes
partition_id => STRING
fetch_offset => INT64
start_offset => INT64
max_bytes => INT32.
FetchResponse Changes
Top-level error code
Per-partition error codes are no longer sufficient to handle all response errors. For example, when an incremental fetch session encounters an InvalidFetchSessionException, we do not know which partitions the client expected to fetch. Therefore, the FetchResponse now contains a top-level error code. This error code is set to indicate that the request as a whole cannot be processed.
When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.
Fetch Session ID
The server uses this field to notify the client when it has created a new session ID.
This field will be 0 if the server has closed the request session, or declined to create it in the first place. Otherwise, it will be the session ID which the client should use for future requests.
Response Flags
response_flags is a bitfield.
INCREMENTAL_FETCH_RESPONSE(0): This bit is set if the response is an incremental fetch response. It is cleared if the response is a full fetch response.
INCREMENTAL_FETCH_OK(1): This bit is set if the client can make incremental fetch requests with this session in the future. It is cleared if the client should make full fetch requests in the future
Incremental Fetch Responses
Incremental fetch responses will only contain information about partitions for which:
- The logStartOffset has changed, or
- The highWaterMark has changed, or
- The lastStableOffset has changed, or
- The aborted transaction list has changed, or
- There is partition data available
The format of the partition data within FetchResponse is unchanged.
Handling Partition Size Limits in Incremental Fetch Responses
Sometimes, the per-fetch-request limit is too small to allow us to return information about every partition. In those cases, we will limit the number of partitions that we return information about, to avoid exceeding the per-request maximum. (As specified in KIP-74, the response will always return at least one message, though.)
If we always returned partition information in the same order, we might end up "starving" the partitions which came near the end of the order. With full fetch requests, the client can rotate the order in which it requests partitions in order to avoid this problem. However, incremental fetch requests need not explicitly specify all the partitions. Indeed, an incremental fetch request may contain no partitions at all.
In order to solve the starvation problem, the server must rotate the order in which it returns partition information. We do this by lexicographically ordering the partitions, and then starting at the partition whose index is the request sequence number modulo the number of partitions.
For example, suppose that the fetch session has partitions P1, P2, and P3. In that case, the fetch request with sequence number 1 will start at index 1 % 3 = 1. Therefore it will return information about P2 first. If any space is left over, we will return information about P3. And if any space is left over at the end, we will return information about P1. Similarly, for the request with sequence number 2, the order will be P3, P1, P2. For the request with sequence number 3, the order will be P1, P2, P3. And so on.
Schema
FetchResponse => throttle_time_ms error_code error_string fetch_session_id [topic]
throttle_time_ms => INT32
error_code => INT16
error_string => STRING
fetch_session_id => INT64
response_flags => INT8
topic => topic_name [partition]
topic_name => STRING
partition => partition_header records
partition_header => partition_id error_code high_watermark last_stable_offset log_start_offset [aborted_transaction]
partition_id => INT32
error_code => INT16
high_watermark => INT64
last_stable_offset => INT64
log_start_offset => INT64
aborted_transaction => producer_key first_offset
producer_key => INT64
first_offset => INT64
records => RECORDS
Compatibility, Deprecation, and Migration Plan
Although this change adds the concept of incremental fetch requests, existing brokers can continue to use the old FetchRequest as before. Therefore, there is no compatibility impact.
Rejected Alternatives
We considered several other approaches to minimizing the inefficiency of FetchRequests and FetchResponses.
- Reduce the number of temporary objects created during serialization and deserialization
- Assign each topic a unique ID, so that topic name strings do not have to be sent over the wire
Changing the way Kafka does serialization and deserialization would be a huge change, requiring rewrites of all the Request and Response classes. We could potentially gain some efficiency by getting rid of the Builder classes for messages, but we would have to find a way to solve the problems which the Builder classes currently solve, such as the delayed binding between message content and message version and format. Micro-optimizing the serialization and deserialization path might also involve making tradeoffs that decrease perceived code quality. For example, we might have to sometimes give up immutability, or use primitive types instead of class types.
Assigning unique IDs to topics is also a very big project, requiring cluster-wide coordination, many RPC format changes, and potentially on-disk format changes as well.
More fundamentally, while both of these approaches improve the constant factors associated with FetchRequest and FetchResponse, they do not change the computational complexity. In both cases, the complexity would remain O(num_extant_partitions). In contrast, the approach above makes the messages scale as O(num_partition_changes). Adding IncrementalFetchRequest and IncrementalFetchResponse is a smaller and more effective change.