You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

Current state: Under Discussion

Discussion thread:

JIRA: KAFKA-1546

Motivation

Currently, replica lag configuration cannot be tuned automatically for high and low volume topics on the same cluster since the lag is computed based on the difference in log end offset between the leader and replicas i.e. number of messages. The default is 4000 messages. For high volume topics, producing even a single large batch can cause replicas to fall out of ISR and in the case of low volume topics detecting a lagging replica takes a very long time. We need a consistent way to measure replica lag in terms of time.

Public Interfaces

This proposal removes 1 config and changes the meaning of another config.

replica.lag.max.messages - This config is deleted since this proposal no longer measures replica lag in terms of number of messages

replica.lag.time.max.ms - The definition of this config now changes. If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead. In addition, if a replica is has not read from the log end offset for this time, it is deemed to not be in ISR because it is not caught up.

Proposed Changes

The proposal is to calculate replica lag as the amount of time not caught up to the leader. Anytime a replica makes a fetch request, the broker will calculate if the request read from the log end offset i.e. the most recent data that the broker has. Each Replica object will maintain a lagBegin metric which is a timestamp corresponding to when the replica first started falling behind. Here's how the code is structured:

// Replica.scala
val readToEndOfLog = logReadResult.initialLogEndOffset.messageOffset - logReadResult.info.fetchOffset.messageOffset <= 0
  if(! readToEndOfLog) {
    lagBeginValue.compareAndSet(-1, time.milliseconds)
  } else {
    lagBeginValue.set(-1)
  }
}

def lagBeginTimeMs = lagBeginValue.get()

// Partition.scala - This is how we calculate slow replicas
val slowReplicas = candidateReplicas.filter(r => (r.lagBeginTimeMs > 0 && (time.milliseconds - r.lagBeginTimeMs) > keepInSyncTimeMs))

lagBeginValue < 0 means that the replica should be in ISR. A replica is removed from ISR if the "current time - lag begin time > lag max ms". A replica is back in ISR if the lag is < 0

There is a subtle case where data is read from the log end offset and an append occurs immediately afterwards but before the lag calculation is complete. To make this more robust, the LogReadResult should return the LogOffsetMetadata corresponding to the LEO just before the read started.

Compatibility, Deprecation, and Migration Plan

This change is fully backward compatible. The only difference is that customers will no longer have to set the replica.lag.max.messages config.

Rejected Alternatives

Time based Replica Lag detection - In this approach, we can calculate the replica lag as the estimated amount of time the replica will need to catch up to the leader. However this requires us to associate a commit timestamp with each message on the master. Such metadata does not exist and is out of scope of this proposal. We can also attempt to calculate the same using the current message throughput per-partition. This approach is also flawed because a low-volume topic can be thrown out of ISR by a burst of traffic even if the replica is not falling behind (read JIRA for more details)



  • No labels