This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: https://www.mail-archive.com/dev@kafka.apache.org/msg83115.html

JIRA: KAFKA-6254

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following.  These periodic FetchRequests must enumerate all the partitions which the follower is interested in.  The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.

The frequency at which the leader responds to the follower's fetch requests is controlled by several configuration tunables, including replica.fetch.wait.max.ms, replica.fetch.min.bytes, replica.fetch.max.bytes, and replica.fetch.response.max.bytes.  Broadly speaking, the system can be tuned for lower latency, by having more frequent, smaller fetch responses, or for reduced system load, having having fewer, larger fetch responses.

There are two major inefficiencies in the current FetchRequest paradigm.  The first one is that the set of partitions which the follower is interested in changes only rarely.  Yet each FetchRequest must enumerate the full set of partitions which the follower is interested in.  The second inefficiency is that even when nothing has changed in a partition since the previous FetchRequest, we must still send back metadata about that partition.

These inefficiencies are linearly proportional to the number of extant partitions in the system.  So, for example, imagine a Kafka installation with 100,000 partitions, most of which receive new messages only rarely.  The brokers in that system will still send back and forth extremely large FetchRequests and FetchResponses, even though there is very little actual message data being added per second.  As the number of partitions grows, Kafka uses more and more network bandwidth to pass back and forth these messages.

When Kafka is tuned for lower latency, these inefficiencies become worse.  If we double the number of FetchRequests sent per second, we should expect there to be more partitions which haven't changed within the reduced polling interval.  And we are less able to amortize the high fixed cost of sending metadata for each partition in every FetchRequest and FetchResponse.  This again results in Kafka using more of the available network bandwidth.

Proposed Changes

We can solve the scalability and latency problems discussed above by creating "incremental" fetch requests that only include information about what has changed.  In order to do this, we need to introduce the concept of "fetch sessions."

Fetch Sessions

A fetch session encapsulates the state of an individual fetcher.  This allows us to avoid resending this state as part of each fetch request.

The Fetch Session includes:

  1. A randomly generated 64-bit session ID which is unique on the leader
  2. The client ID
    1. The numeric follower ID, if this fetch session belongs to a Kafka broker
    2. The client ID string, if this fetch session belongs to a Kafka consumer
  3. The fetch epoch
  4. For each partition which the fetcher is interested in:
    1. The topic and partition ID which uniquely identify the partition within Kafka
    2. The last fetch offset
    3. The maximum number of bytes to fetch from this partition
    4. The last dirty epoch
  5. The time when the fetch session was last used

1. Fetch Session ID

The fetch session ID is a randomly generated 64-bit session ID.  It is a unique, immutable identifier for the fetch session.  Note that the fetch session ID may not be globally unique (although it's very likely to be so, since the space of 64 bit numbers is very large.)  It simply has to be unique on the leader.

Since the ID is randomly chosen, it cannot leak information to unprivileged clients.  It is also very hard for a malicious client to guess the fetch session ID.  (Of course, there are other defenses in place against malicious clients, but the randomness of the ID provides defense in depth.)

2. Client ID

The client ID describes who created this fetch session.  Sessions may be created by both brokers and Kafka clients.

It is useful to retain this ID so that we can include it in log messages, for greater debuggability.  It is also useful for cache management (more about that later.)

3. The Fetch Epoch

The fetch epoch is a monotonically incrementing 64-bit counter.  When the leader receives a fetch request with epoch N + 1, it knows that the data it sent back for the fetch request with epoch N was successfully processed by the follower.

Epoch 0 is considered to be the full fetch request, which establishes the incremental fetch parameters.  Incremental fetch requests always have an epoch which is 1 or greater (but less than Long.MAX_VALUE).

4. Per-Partition Data

The FetchSession maintains information about a specific set of relevant partitions.  Note that the set of relevant partitions is established when the FetchSession is created.  It cannot be changed later.

For each partition, the last fetch offset is the latest offset which the fetcher has requested.  This is taken directly from the incremental FetchRequest RPC.  It is not updated based on the partition data that we send back to the fetcher.

We also track the maximum number of bytes to send back to the fetcher.  This value is taken directly from the last FetchRequest in the FetchSession which mentioned the partition.

A partition is considered "dirty" if it has changed and needs to be resent.  The partition becomes dirty when:

To mark the partition as dirty, we set lastDirtyEpoch to the number of the upcoming fetch epoch.

4. The time when the fetch session was last used

This is the time in wall-clock milliseconds when the fetch session was last used.  This is used to expire fetch sessions after they have been inactive.  See the next section for details.

Fetch Session Caching

Because fetch sessions use memory on the leader, we want to limit the amount of them that we have at any given time.  Therefore, the fetch session cache has a limited number of slots.

Each broker in the cluster gets its own set of cache slots. It makes sense to reserve dedicated cache slots for brokers, because we know that they will be fetching lots of partitions. They should get a lot of benefit out of incremental fetch requests.  The reason for giving each broker more than one cache slot is because brokers may have several fetcher threads which operate in parallel.  In this scenario, each fetcher thread should get its own FetchSession, following its own disjoint set of partitions.

The Kafka clients all share a common pool of cache slots.  Cache slots will be allocated on a first-come, first-serve basis.  Since clients which fetch many partitions will get the most benefit out of incremental fetch requests, the server may refuse to create a FetchSession unless the client requests more than a configurable number of partitions.  This will allow clients which watch many partitions, like MirrorMaker, to get a cache slot even if there are many clients.

Fetch Sessions become eligible for eviction if they are not used within a configurable time period.  Every time the fetch session is successfully used, the timer gets reset.  In general, the time period can be fairly short-- just a minute or two.  The time period can be relatively short because most clients send fetch requests frequently.  This is certainly true for followers loading new messages from the leader.

If users have Kafka clients which consume many partitions, but take a long time between fetch requests, they may want to increase the timeout.  Alternately, they can bump up the minimum FetchSession size, so that fewer clients are competing for FetchSesssions.

Clients can voluntarily give up their FetchSessions.  They may want to do this because they want to establish a new fetch session that is monitoring a different set of partitions, or because the client is going away.

Public Interface Changes 

Error Codes

There is a new error code, InvalidFetchSession.  The server responds with this error code when it is given an fetch session ID that is no longer valid.  It may also respond with this error code if the fetch session epoch is not valid for the given fetch session.

FetchRequest Changes

There are several changes to the FetchRequest API.

The FetchResponse now contains a top-level error code.  This error code is set to indicate that the request as a whole cannot be processed.  It will be set to InvalidFetchSession if the fetch session information in the request was invalid.  When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.

The FetchRequest now contains a fetch session ID and fetch session epoch, to be used as follows:

GoalFetch Session IDFetch Session Epoch
Make a full fetch request0Long.MAX_VALUE
Make a full fetch request and request a new incremental fetch session00
Make an incremental fetch requestThe incremental fetch session IDThe incremental fetch session epoch

The FetchResponse now contains a fetch session ID.

Request TypeResponse Fetch Session IDMeaning
Full fetch request0This is a full fetch response.  No incremental fetch session was created.
Full fetch requestA non-zero fetch session IDThis is a full fetch response.  An incremental fetch session was created with the given ID.
Incremental fetch requestThe fetch session IDThis is an incremental fetch response for the given incremental session ID.

The response to a full fetch request is always a full fetch response.  Similarly, the response to an incremental fetch request is always an incremental fetch response.

Full FetchRequests and full FetchResponses will contain information about all partitions.  This is unchanged from the current behavior.

Incremental FetchRequests will only contain information about partitions which have changed on the follower.  If a partition has not changed, it will not be included. Possible changes include:

Note that the existing system by which the leader becomes aware of changes to the follower's fetch position is not changed.  Every time the follower wants to update the fetch position, it must include the partition in the next incremental request.  Also note that if no partitions have changed, the next incremental FetchRequest will not contain any partitions at all.

Similarly, incremental fetch responses from the server will only contain information about partitions for which:

FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic]

  max_wait_time => INT32

  replica_id => INT32

  min_bytes => INT32

  isolation_level => INT8

  fetch_session_id => INT64

  fetch_session_epoch => INT64

  topic => topic_name [partition]

  topic_name => STRING

  partition => partition_id fetch_offset start_offset max_bytes

  partition_id => STRING

  fetch_offset => INT64

  start_offset => INT64

  max_bytes => INT32

FetchResponse => throttle_time_ms error_code error_string fetch_session_id [topic]

  throttle_time_ms => INT32

  error_code => INT16

  error_string => STRING

  fetch_session_id => INT64

  topic => topic_name [partition]

  topic_name => STRING

  partition => partition_header records

  partition_header => partition_id error_code high_watermark last_stable_offset log_start_offset [aborted_transaction]

  partition_id => INT32

  error_code => INT16

  high_watermark => INT64

  last_stable_offset => INT64

  log_start_offset => INT64

  aborted_transaction => producer_key first_offset

  producer_key => INT64

  first_offset => INT64

  records => RECORDS

New Configurations

min.fetch.session.size

This is a broker configuration which sets the minimum size of a fetch session.  The broker will not create incremental fetch sessions for fetch requests smaller than this.

min.fetch.session.eviction.timeout.ms

This is a broker configuration.  Fetch sessions which are not used within this timeout are eligible for eviction.

request.incremental.fetch.session

This is a configuration for brokers or Kafka consumers.  If it is set to true, we will request an incremental fetch session when fetching partition data from other brokers.  If it is set to false, we will not.  The default is true.

Compatibility, Deprecation, and Migration Plan

Although this change adds the concept of incremental fetch requests, existing brokers can continue to use the old FetchRequest as before.  Therefore, there is no compatibility impact.

Rejected Alternatives

We considered several other approaches to minimizing the inefficiency of FetchRequests and FetchResponses.

Changing the way Kafka does serialization and deserialization would be a huge change, requiring rewrites of all the Request and Response classes.  We could potentially gain some efficiency by getting rid of the Builder classes for messages, but we would have to find a way to solve the problems which the Builder classes currently solve, such as the delayed binding between message content and message version and format.  Micro-optimizing the serialization and deserialization path might also involve making tradeoffs that decrease perceived code quality.  For example, we might have to sometimes give up immutability, or use primitive types instead of class types.

Assigning unique IDs to topics is also a very big project, requiring cluster-wide coordination, many RPC format changes, and potentially on-disk format changes as well.

More fundamentally, while both of these approaches improve the constant factors associated with FetchRequest and FetchResponse, they do not change the computational complexity.  In both cases, the complexity would remain O(num_extant_partitions).  In contrast, the approach above makes the messages scale as O(num_partition_changes).  Adding IncrementalFetchRequest and IncrementalFetchResponse is a smaller and more effective change.