Status
Current state: Under Discussion
Discussion thread:
JIRA: KAFKA-1546
Motivation
Currently, replica lag configuration cannot be tuned automatically for high and low volume topics on the same cluster since the lag is computed based on the difference in log end offset between the leader and replicas i.e. number of messages. The default is 4000 messages. For high volume topics, producing even a single large batch can cause replicas to fall out of ISR and in the case of low volume topics detecting a lagging replica takes a very long time. We need a consistent way to measure replica lag in terms of time.
Public Interfaces
This proposal removes 1 config and changes the meaning of another config.
replica.lag.max.messages - This config is deleted since this proposal no longer measures replica lag in terms of number of messages
replica.lag.time.max.ms - This config now means 2 things. If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead. In addition, if a replica is has not read from the log end offset for this time, it is deemed to not be in ISR because it is not caught up.
Proposed Changes
The proposal is to calculate replica lag as the amount of time not caught up to the leader. Anytime a replica makes a fetch request, the broker will calculate if the request read from the log end offset i.e. the most recent data that the broker has. Each Replica object will maintain a lagBegin metric which is a timestamp corresponding to when the replica first started falling behind. Here's how the code is structured:
// Replica.scala val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0 if(! readToEndOfLog) { lagBeginValue.compareAndSet(-1, time.milliseconds) } else { lagBeginValue.set(-1) } } def lagBeginTimeMs = lagBeginValue.get() // Partition.scala - This is how we calculate slow replicas val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs))
lagBeginValue < 0 means that the replica should be in ISR. A replica is removed from ISR if the "current time - lag begin time > lag max ms". A replica is back in ISR if the lag is < 0
There is a subtle case where data is read from the log end offset and an append occurs immediately but before the lag calculation is complete. To make this more robust, the LogReadResult should return the LogOffsetMetadata corresponding to the LEO just before the read started.
Compatibility, Deprecation, and Migration Plan
This change is fully backward compatible. The only difference is that customers will no longer have to set the replica.lag.max.messages config.
Rejected Alternatives
Time based Replica Lag detection - In this approach, we can calculate the replica lag as the estimated amount of time the replica will need to catch up to the leader. However this requires us to associate a commit timestamp with each message on the master. Such metadata does not exist and is out of scope of this proposal. We can also attempt to calculate the same using the current message throughput per-partition. This approach is also flawed because a low-volume topic can be thrown out of ISR by a burst of traffic even if the replica is not falling behind (read JIRA for more details)