Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

replica.lag.time.max.ms - This config now means 2 things. If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead. In addition, if a replica is has not read from the log end offset for this time, it is deemed to not be in ISR because it is not caught up.

Proposed Changes

The proposal is to calculate replica lag as the amount of time not caught up to the leader. Anytime a replica makes a fetch request, the broker will calculate if the request read from the log end offset i.e. the most recent data that the broker has. Each Replica object will maintain a lagBegin metric which is a timestamp corresponding to when the replica first started falling behind. Here's how the code is structured:

Code Block
// Replica.scala
val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0
  if(! readToEndOfLog) {
    lagBeginValue.compareAndSet(-1, time.milliseconds)
  } else {
    lagBeginValue.set(-1)
  }
}

def lagBeginTimeMs = lagBeginValue.get()

// Partition.scala - This is how we calculate slow replicas
val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs))

lagBeginValue < 0 means that the replica should be in ISR. A replica is removed from ISR if the "current time - lag begin time > lag max ms". A replica is back in ISR if the lag is < 0

There is a subtle case where data is read from the log end offset and an append occurs immediately but before the lag calculation is complete. To make this more robust, the LogReadResult should return the LogOffsetMetadata corresponding to the LEO just before the read started.

Compatibility, Deprecation, and Migration Plan

...