Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current State: "Discussion"

Discussion Thread: here, here

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-8733

...

Follower replicas fetch data from the leader replica continuously to catch up with the leader. Follower replica is considered to be out of sync if it lags behind the leader replica for more than replica.lag.time.max.ms

  • If a follower has not sent any fetch requests within the configured value of replica.lag.time.max.ms. This can happen when the follower is down or it is stuck due to GC pauses or it is not able to communicate to the leader replica within that duration.
  • If a follower has not consumed up to the leader’s log-end-offset at the earlier fetch request within replica.lag.time.max.ms. This can happen with new replicas that start fetching messages from leader or replicas which are kind of slow in processing the fetched messages.

But we found an issue of partitions going offline even though follower replicas try their best to fetch from the leader replica.  Sometimes, the leader replica may take time to process the fetch requests. This can be due to having issues in memory or disk while reading the respective segment data or processing fetch requests because of intermittent CPU issues.   it makes follower replicas out of sync even though followers send fetch requests within replica.lag.time.max.ms duration. We observed this behavior multiple times in our clusters.

...

Follower replicas send fetch requests to leader replica to replicate the messages for topic partitions. They send fetch requests continuously and always try to catchup leader’s log-end-offset. 

replica.fetch.wait.max.ms property represents wait time for each fetch request to be processed by the leader. This property value should be less than the replica.lag.time.max.ms so that a follower can send the next fetch request before a follower replica is considered out of sync. This will avoid frequent shrinking of ISRs. This allows a follower replica always tries its best to avoid going out of sync by sending frequent fetch requests within replica.lag.time.max.ms.

Image Modified

Each broker runs a ISR update task("isr-expiration") at regular intervals to update the metadata in ZK(directly or via controller based on if IBP version >= 2.7) about insync replicas of all the leader partitions of this broker. Its interval is 10.5 * replica.lag.time.max.ms. This allows an insync follower replica can lag behind the leader replica upto 1.5 * replica.lag.time.max.ms.

Leader replica maintains state of each follower replica that includes

...

This feature can be enabled by setting the property “follower.fetch.pending.reads.insync.enable” to true. The default value will be set as false to give backward compatibility.

This approach will reduce the offline partition occurrence. But the main disadvantage is it can still happen when there are requests queued up and the existing fetch requests io threads are taking longer. The subsequent requests may get stuck in the queue and they may not be able to get served before the ISR Expiration Task considers them out of sync and eventually causes offline partitions. 

Solution 2

This is an extension of Solution 1 with the leader relinquishing the leadership if a fetch request takes longer than expected.  It will also move its broker id to last in the sequence of ISRs while sending AlterISRRequest to Zookeeper or Controller. That will avoid choosing this broker as the leader immediately by the controller. This approach will mitigate the case of requests getting piled up in the requests queue as mentioned earlier. We can introduce the respective config for the timeout with a default value.

The main disadvantage with this approach is ISR thrashing may occur when there are intermitten issues across brokers.

Example : 

<Topic, partition>: <events, 0>

replica.lag.max.time.ms = 10_000 

follower.fetch.process.time.max.ms = 500

Assigned replicas: 1001, 1002, 1003, isr: 1001,1002,1003, leader: 1001

...

While the fetch request is being processed, insync replica check for follower replica(id:1002) checks on the leader replica (id: 1001) returns return true as there are pending fetch requests with fetch offset >= LEO at earlier fetch request. 

Once the fetch request is served, lastCaughtupTime is set as ty+25. Subsequent insync checks would result in true as long as the follower replica sends fetch requests at regular intervals according to replica.lag.time.max.ms and replica.fetch.wait.max.ms.  Leader will also lose the leadership and one of the insync replicas becomes a leader.

Public Interfaces

Below property is added to the broker configuration.

Configuration Name

Description

Valid Values

Default Value

follower.fetch.pending.reads.insync.enable

This property allows a replica to be insync if the follower has pending fetch requests which are >= log-end-offset of the earlier fetch request.

true or false

false

follower.fetch.process.time.max.ms

This property represents the maximum time in milliseconds that a leader can process the follower fetch requests. If it takes more than this configured time then it relinquishes the leadership. This property is applicable only if `follower.fetch.pending.reads.insync.enable` is set as true. 

> 0

500

Compatibility, Deprecation, and Migration Plan

This change is backward compatible with previous versions.

Rejected Alternatives

Whenever the leader partition throws errors or is not able to process the requests within a specific time (at least replica.lag.time.max.ms) then it should relinquish its leadership and allow another in-sync replica as the leader. But this may cause ISR thrashing when there are intermittent issues in processing the follower fetch requests.