Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposal is to calculate replica lag as the amount of time not caught up to the leader.  A replica is deemed to be "caught up" if it's last fetch request read up to the log end offset of the broker at that time and if it made a fetch request within the last n units of time. A replica is only in ISR if it is caught up.

 

 On each replica Anytime a replica makes a fetch request, the broker will calculate if the request read from the log end offset i.e. the most recent data that the broker has. Each Replica object will maintain maintains a lagBegin metric which is a timestamp corresponding to when the replica first started falling behind"lagging". Here's how the code is structured:

Code Block
// Replica.scala
val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0
  if(! readToEndOfLog) {
    lagBeginValue.compareAndSet(-1, time.milliseconds)
  } else {
    lagBeginValue.set(-1)
  }
}

def lagBeginTimeMs = lagBeginValue.get()

// Partition.scala - This is how we calculate slow replicas
val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs))

lagBeginValue < 0 means that the replica should be in ISR. A replica is removed from ISR if the "current time - lag begin time > lag max ms". A replica is back in ISR if the lag is < 0is not lagging. There is a subtle case where data is read from the log end offset and an append occurs immediately afterwards but before the lag calculation is complete. To make this more robust, the LogReadResult should return the LogOffsetMetadata corresponding to the LEO just before the read started.

Expected Scenarios

1. High volume topics Lagging follower - Follower for any high volume topic does not stay in ISR only if it cannot read from the log end offset for replica.lag.max.ms time i.e. it is genuinely not able to keep up. Previously it would fall out of ISR even if a single large batch was produced

2. Low volume topics - 

...

Stuck Follower - As before, if a follower does not make a fetch request for a period of time, it shall be removed from ISR

3. Bootstrapping follower - If a follower is down for an extended period of time, it will stay outside of the ISR until it is caught up.

I plan to test all these scenarios using the PerformanceTest class.

Compatibility, Deprecation, and Migration Plan

This change is fully backward compatible. The only difference is that customers will no longer have to set the replica.lag.max.messages config.

...