Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Because fetch sessions use memory on the leader, we want to limit the amount of them that we have at any given time.  Therefore, each broker will create only a limited number of incremental fetch sessions.

There are two is one new public configurations for fetch session caching:

  • max.incremental.fetch.session.cache.slots, which set the number of incremental fetch session cache slots on the brokerDefault value: 10,000

There is one new constant for fetch session caching:

  • min.incremental.fetch.session.eviction.ms, which sets the minimum amount of time we will wait before evicting an incremental fetch session from the cache.Default value  Value: 120,000

When the server gets a new request to create an incremental fetch session, it will compare the proposed new session with its existing sessions.  The new session will evict an existing session if and only if:

...

The fetch epoch keeps the state on the leader and the follower synchronized.  It ensures that if a message is duplicated or lost, the server will always notice.  It is also used to associate requests and responses in the logs. Other numbers, such as IP addresses and ports, or NetworkClient sequence numbers, can also be helpful for this purpose-- but they are less likely to be unique.

Fetch Type

The FetchType is an 8-bit number with the following values:

  • 0: SESSIONLESS
  • 1: FULL
  • 2: INCREMENTAL

 

FetchRequest Metadata meaning

Request

FetchType

Request

SessionId

Request

SessionEpoch

Meaning
-10SESSIONLESSignoredignored

Make a full FetchRequest that does not use or create a session.

This is the FetchType of session ID used by pre-KIP-227 FetchRequests.

FULL00

Make a full FetchRequest.

Create a new incremental fetch session if possible.

FULL

  If a new fetch session is created, it will start at epoch 1.

$ID0

Close the incremental fetch session identified by $ID.

Make a full FetchRequest.

Create a new incremental fetch session if possible.

INCREMENTAL

.  If a new fetch session is created, it will start at epoch 1.

$ID$EPOCHIf the ID and EPOCH are correct, make an incremental fetch request.

...

Incremental fetch requests have FetchType set to INCREMENTALa positive fetch session ID.

A partition is only included in an incremental FetchRequest if:

...

  isolation_level => INT8

  fetch_type => INT8  fetch_session_id => INT32

  fetch_session_epoch => INT32

...

When the top-level error code is set, the caller should assume that all the partitions in the fetch received the given error.

Fetch Type

The FetchResponse now contains an 8-bit fetch type.

 

Fetch Session ID

The FetchResponse now contains a 32-bit fetch session ID.

Fetch Session Epoch

The FetchResponse now contains a 32-bit fetch session epoch.  This is the epoch which the server expects to see in the next fetch request.

FetchResponse Metadata meaning

Request

FetchType

Request

SessionId

Request

SessionEpoch

Meaning
SESSIONLESSignoredignored

This is a response to a SESSIONLESS fetch request.

The broker also uses this FetchType when there  was a fetch session error.  The error will be described by the top-level response error field.

FULL00No fetch session was created.
INCREMENTAL$ID$EPOCH

The next request can be an incremental fetch request with the given $ID and $EPOCH.

...

FetchResponse Metadata meaning

Request

SessionId

Meaning
0No fetch session was created.
$ID

The next request can be an incremental fetch request with the given $ID.

Incremental Fetch Responses

...

  throttle_time_ms => INT32

  error_code => INT16

  error_string => STRING

  fetch_type  => INT8

  fetch_session_id => INT32  fetch_session_epoch => INT32

  topic => topic_name [partition]

...