Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org/apache/kafka/connect/mirror/MirrorMetrics - new metrics templates for max, min, avg and value replication-offset-lag

    Code Block
    languagejava
    titleMirrorMetrics.java
     replicationOffsetLag = new MetricNameTemplate(
    
     "replication-offset-lag", SOURCE_CONNECTOR_GROUP,
    
     "Count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagMax = new MetricNameTemplate(
    
     "replication-offset-lag-max", SOURCE_CONNECTOR_GROUP,
    
     "Max count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagMin = new MetricNameTemplate(
    
     "replication-offset-lag-min", SOURCE_CONNECTOR_GROUP,
    
     "Min count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagAvg = new MetricNameTemplate(
    
     "replication-offset-lag-avg", SOURCE_CONNECTOR_GROUP,
    
     "Average count of records to be replicated from source to target cluster.", partitionTags)


  • org/apache/kafka/connect/mirror/MirrorSourceTaskConfig - a configuration to control the poll interval for the Consumer.endOffsets() call at LEO acquisition mentioned below. 
    Proposed name: replication.offset.lag.metric.refresh.intervalProposed default:15 seconds
  • org/apache/kafka/connect/mirror/MirrorSourceTaskConfig - a configuration to control the time-to-live property of a stale partition LRO in order to avoid the in-memory LRO "cache" from causing memory overhead.Proposed name: replication.offset.lag.metric.last-replicated-offset.ttlProposed default: 20 seconds

Proposed Changes

This proposal aims to enhance Kafka's monitoring capabilities by introducing a new metric to track the replication offset lag for a given topic-partition. The metric will be calculated by taking the difference between the LEO of a partition, which will be periodically updated by calling Consumer.endOffsets()  , and the LRO of a partition, which will be stored in an in-memory "cache" and updated during the task's producer callback

The in-memory "cache" entries will be controlled by a TTL-based eviction policy in order to avoid storing stale LRO entries which can cause an eventual OOM error.

The replication.offset.lag.metric.refresh.interval  metric will also accept a value of -1  to indicate that a user is willing to disable reporting for this metrics - upon reading this value for this config, the period poll job for a partition LEO will not be submitted causing the metric reporting to be skipped.

Anchor
changeSteps
changeSteps

...