Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Apache Kafka Brokers make periodic FetchRequests to other brokers, in order to learn about updates to partitions they are following.  These periodic FetchRequests must enumerate all the partitions which the follower is interested in.  The responses also enumerate all the partitions, plus metadata (and potentially data) about each one.

...

When Kafka is tuned for lower latency, these inefficiencies become worse.  If we double the number of FetchRequests sent per second, we should expect there to be more partitions which haven't changed within the reduced polling interval.  And we are less able to amortize the high fixed cost of sending metadata for each partition in every FetchRequest and FetchResponse.  This again results in Kafka using more of the available network bandwidth.

Proposed Changes

We can solve the scalability and latency problems discussed above by creating "incremental" fetch requests and responses that only include information about what has changed.  In order to do this, we need to introduce the concept of "fetch sessions."

Fetch Sessions

A fetch session encapsulates the state of an individual fetcher.  This allows us to avoid resending this state as part of each fetch request.

...

  1. A randomly generated 32-bit session ID which is unique on the leader
  2. The 32-bit fetch epoch
  3. Cached data about each partition which the fetcher is interested in.
  4. The privileged bit
  5. The time when the fetch session was last used

Fetch Session ID

The fetch session ID is a randomly generated 32-bit session ID.  It is a unique, immutable identifier for the fetch session.  Note that the fetch session ID may not be globally unique (although it's very likely to be so.)  It simply has to be unique on the leader.

Since the ID is randomly chosen, it cannot leak information to unprivileged clients.  It is also very hard for a malicious client to guess the fetch session ID.  (Of course, there are other defenses in place against malicious clients, but the randomness of the ID provides defense in depth.)

Fetch Epoch

The fetch epoch is a monotonically incrementing 32-bit counter. After processing request N, the broker expects to receive request N+1.

The sequence number is always greater than 0.  After reaching MAX_INT, it wraps around to 1.

Cached data about each partition

If the fetch session supports incremental fetches, the FetchSession will maintain information about each partition in the incremental fetch.

...

The leader uses this cached information to decide which partitions to include in the FetchResponse.  Whenever any of these elements change, or if there is new data available for a partition, the partition will be included.

Privileged bit

The privileged bit is set if the fetch session was created by a follower.  It is cleared if the fetch session was created by a regular consumer.

This is retained in order to prioritize followers over consumers, when resources are low.  See the section on fetch session caching for details.

The time when the fetch session was last used

This is the time in wall-clock milliseconds when the fetch session was last used.  This is used to expire fetch sessions after they have been inactive.  See the section on fetch session caching for details.

Fetch Session Caching

Because fetch sessions use memory on the leader, we want to limit the amount of them that we have at any given time.  Therefore, each broker will create only a limited number of incremental fetch sessions.

...

  • Followers get priority over consumers
  • Inactive session get replaced over time
  • Bigger requests, which benefit more from being incremental, are prioritized
  • Cache thrashing is limited, avoiding expensive session re-establishment when there are more fetchers than cache slots.

Public Interface Changes 

New Error Codes

FetchSessionIdNotFound: The server responds with this error code when the client request refers to a fetch session that the server does not know about.  This may happen if there was a client error, or if the fetch session was evicted by the server.

InvalidFetchSessionEpochException.  The server responds with this error code when the fetch session epoch of a request is different than what it expected.

FetchRequest Changes

There are several changes to the FetchRequest API.

Fetch Session ID

A 32-bit number which identifies the current fetch session.  If this is set to 0, there is no current fetch session.

Fetch Session Epoch

A 32-bit number which identifies the current fetch session epoch.  Valid session epochs are always positive-- they are never 0 or negative numbers.

...

The fetch epoch keeps the state on the leader and the follower synchronized.  It ensures that if a message is duplicated or lost, the server will always notice.  It is also used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique.

Fetch Type

The FetchType is an 8-bit number with the following values:

  • 0: SESSIONLESS
  • 1: FULL
  • 2: INCREMENTAL

FetchRequest Metadata meaning

Request

FetchType

Request

SessionId

Request

SessionEpoch

Meaning
SESSIONLESSignoredignored

Make a full FetchRequest that does not use or create a session.

This is the FetchType of pre-KIP-227 FetchRequests.

FULL00

Make a full FetchRequest.

Create a new incremental fetch session if possible.

FULL$ID0

Close the incremental fetch session identified by $ID.

Make a full FetchRequest.

Create a new incremental fetch session if possible.

INCREMENTAL$ID$EPOCHIf the ID and EPOCH are correct, make an incremental fetch request.

Incremental Fetch Requests

Incremental fetch requests have FetchType set to INCREMENTAL.

...

If the client wants to remove a partition, the client will set the partition's maxBytes to 0 in the request.

Schema

FetchRequest => max_wait_time replica_id min_bytes isolation_level fetch_session_id fetch_session_epoch [topic]

...

  start_offset => INT64

  max_bytes => INT32.

FetchResponse Changes

Top-level error code

Per-partition error codes are no longer sufficient to handle all response errors.  For example, when an incremental fetch session encounters an FetchSessionIdNotFoundException, we do not know which partitions the client expected to fetch.  Therefore, the FetchResponse now contains a top-level error code.  This error code is set to indicate that the request as a whole cannot be processed.

When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.

Fetch Type

The FetchResponse now contains an 8-bit fetch type.

Fetch Session ID

The FetchResponse now contains a 32-bit fetch session ID.

Fetch Session Epoch

The FetchResponse now contains a 32-bit fetch session epoch.  This is the epoch which the server expects to see in the next fetch request.

FetchResponse Metadata meaning

Request

FetchType

Request

SessionId

Request

SessionEpoch

Meaning
SESSIONLESSignoredignored

This is a response to a SESSIONLESS fetch request.

The broker also uses this FetchType when there  was a fetch session error.  The error will be described by the top-level response error field.

FULL00No fetch session was created.
INCREMENTAL$ID$EPOCH

The next request can be an incremental fetch request with the given $ID and $EPOCH.

Note that the response to a FULL FetchRequest may have an INCREMENTAL FetchType.  This response contains information about all partitions, but also indicates that the next request which the client makes can be an incremental FetchRequest.

Incremental Fetch Responses

A partition is only included in an incremental FetchResponse if:

...

The format of the partition data within FetchResponse is unchanged.

Handling Partition Size Limits in Incremental Fetch Responses

Sometimes, the per-fetch-request limit is too small to allow us to return information about every partition.  In those cases, we will limit the number of partitions that we return information about, to avoid exceeding the per-request maximum.  (As specified in KIP-74, the response will always return at least one message, though.)

...

In order to solve the starvation problem, the server must rotate the order in which it returns partition information.  The server does this by maintaining a linked list of all partitions in the fetch session.  When data is returned for a partition, that partition is moved to the end of the list.  This ensures that we eventually return data about all partitions for which data is available.

Schema

FetchResponse => throttle_time_ms error_code error_string fetch_session_id [topic]

...

  first_offset => INT64

  records => RECORDS

New Metrics

The following new metrics will be added to track cache consumption:

TotalIncrementalFetchSessions: Tracks the number of incremental fetch sessions which exist.

TotalIncrementalFetchSessionsEvicted: Tracks the number of incremental fetch sessions that were evicted from the cache.  This metric is not increased when a client closes its own incremental fetch session

TotalIncrementalFetchPartitionsCached: Tracks the total number of partitions cached by incremental fetch sessions.

Compatibility, Deprecation, and Migration Plan

Although this change adds the concept of incremental fetch requests, existing brokers can continue to use the old FetchRequest as before.  Therefore, there is no compatibility impact.

Rejected Alternatives

We considered several other approaches to minimizing the inefficiency of FetchRequests and FetchResponses.

...