This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: Under Discussion
Discussion thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following. These periodic FetchRequests must enumerate all the partitions which the follower is interested in. The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.
The frequency at which the leader responds to the follower's fetch requests is controlled by several configuration tunables, including replica.fetch.wait.max.ms, replica.fetch.min.bytes, replica.fetch.max.bytes, and replica.fetch.response.max.bytes. Broadly speaking, the system can be tuned for lower latency, by having more frequent, smaller fetch responses, or for reduced system load, having having fewer, larger fetch responses.
There are two major inefficiencies in the current FetchRequest paradigm. The first one is that the set of partitions which the follower is interested in changes only rarely. Yet each FetchRequest must enumerate the full set of partitions which the follower is interested in. The second inefficiency is that even when nothing has changed in a partition since the previous FetchRequest, we must still send back metadata about that partition.
These inefficiencies are linearly proportional to the number of extant partitions in the system. So, for example, imagine a Kafka installation with 100,000 partitions, most of which receive new messages only rarely. The brokers in that system will still send back and forth extremely large FetchRequests and FetchResponses, even though there is very little actual message data being added per second. As the number of partitions grows, Kafka uses more and more network bandwidth to pass back and forth these messages.
When Kafka is tuned for lower latency, these inefficiencies become worse. If we double the number of FetchRequests sent per second, we should expect there to be more partitions which haven't changed within the reduced polling interval. And we are less able to amortize the high fixed cost of sending metadata for each partition in every FetchRequest and FetchResponse. This again results in Kafka using more of the available network bandwidth.
Proposed Changes
We can solve both of these problems by adding a new RPC, the incremental fetch request.
An IncrementalFetchRequest does not enumerate the full set of partitions that the follower is interested in. Instead, it describes only the partitions that have been updated since the last request was sent. So, for example, in our hypothetical 100,000-partition cluster, if the follower has only updated the log end offset of a single partition, it would include only that single partition in its IncrementalFetchRequest message, rather than all of them. The corresponding IncrementalFetchResponse omits repeating metadata about partitions that haven't changed since the last IncrementalFetchRequest.
The follower registers the set of partitions which it is interested in by sending an ordinary FetchRequest. (Because the FetchRequest currently includes a broker id, there is no ambiguity about which follower has sent the request.) The leader then generates a random 64-bit UUID and returns it in the FetchResponse. The follower can then make IncrementalFetchRequests with that same UUID, and the leader will understand what set of partitions the follower is interested in. If the leader receives an IncrementalFetchRequest with a UUID that does not match that of the latest FetchResponse, it will return an error. To resolve this error, the follower must fall back to making a full FetchRequest.
The purpose of the randomly generated UUID is to make it absolutely clear which full fetch request each incremental fetch request is based on. For example, imagine that a follower makes two slightly different FetchRequests, and the leader ends up processing the earlier one later for some reason. In that case, we do not want the follower and the leader to disagree about what the IncrementalFetchResponse is based on. The UUID prevents this. Similarly, if someone sends a full fetch request with a broker ID that is not theirs, either accidentally or maliciously, they cannot disrupt the functioning of the real broker. The UUID, being random, cannot be forged by an attacker or counterfeited by buggy code.
Public Interfaces
This change adds a new RPC, the IncrementalFetchRequest.
IncrementalFetchRequest => uuid replica_id max_wait_time min_bytes isolation_level [topic]
uuid => INT64
replica_id => INT32
max_wait_time => INT32
min_bytes => INT32
isolation_level => INT8
topic => topic_name [partition]
topic_name => STRING
partition => partition_id fetch_offset start_offset max_bytes
partition_id => STRING
fetch_offset => INT64
start_offset => INT64
max_bytes => INT32
IncrementalFetchResponse => throttle_time_ms error_code uuid [topic]
throttle_time_ms => INT32
error_code => INT16
uuid => INT64
topic => topic_name [partition]
topic_name => STRING
partition => partition_header records
partition_header => partition_id error_code high_watermark last_stable_offset log_start_offset [aborted_transaction]
partition_id => INT32
error_code => INT16
high_watermark => INT64
last_stable_offset => INT64
log_start_offset => INT64
aborted_transaction => producer_key first_offset
producer_key => INT64
first_offset => INT64
records => RECORDS
This change also adds a 64-bit UUID to FetchResponse. There are no other changes to FetchResponse.
Compatibility, Deprecation, and Migration Plan
Although this change adds a new request type, existing brokers can continue to use the old FetchRequest as before. Therefore, there is no compatibility impact.
Rejected Alternatives
We considered several other approaches to minimizing the inefficiency of FetchRequests and FetchResponses.
- Reduce the number of temporary objects created during serialization and deserialization
- Assign each topic a unique ID, so that topic name strings do not have to be sent over the wire
Changing the way Kafka does serialization and deserialization would be a huge change, requiring rewrites of all the Request and Response classes. We could potentially gain some efficiency by getting rid of the Builder classes for messages, but we would have to find a way to solve the problems which the Builder classes currently solve, such as the delayed binding between message content and message version and format. Micro-optimizing the serialization and deserialization path might also involve making tradeoffs that decrease perceived code quality. For example, we might have to sometimes give up immutability, or use primitive types instead of class types.
Assigning unique IDs to topics is also a very big project, requiring cluster-wide coordination, many RPC format changes, and potentially on-disk format changes as well.
More fundamentally, while both of these approaches improve the constant factors associated with FetchRequest and FetchResponse, they do not change the computational complexity. In both cases, the complexity would remain O(num_extant_partitions). In contrast, the approach above makes the messages scale as O(num_partition_changes). Adding IncrementalFetchRequest and IncrementalFetchResponse is a smaller and more effective change.