Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • org/apache/kafka/connect/mirror/MirrorMetrics - new metrics templates for max, min, avg and value replication-offset-lag

    Code Block
    languagejava
    titleMirrorMetrics.java
     replicationOffsetLag = new MetricNameTemplate(
    
     "replication-offset-lag", SOURCE_CONNECTOR_GROUP,
    
     "Count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagMax = new MetricNameTemplate(
    
     "replication-offset-lag-max", SOURCE_CONNECTOR_GROUP,
    
     "Max count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagMin = new MetricNameTemplate(
    
     "replication-offset-lag-min", SOURCE_CONNECTOR_GROUP,
    
     "Min count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagAvg = new MetricNameTemplate(
    
     "replication-offset-lag-avg", SOURCE_CONNECTOR_GROUP,
    
     "Average count of records to be replicated from source to target cluster.", partitionTags)


  • (Optional) - MirrorConnectorConfig - a configuration to control the poll interval for the Consumer.endOffsets() call at LEO acquisition mentioned below.
    [below|# changeSteps]
    

Proposed Changes

This proposal aims to enhance Kafka's monitoring capabilities by introducing a new metric to track the replication offset lag for a given topic-partition. The metric will be calculated by taking the difference between the LEO of a partition, which will be periodically updated by calling Consumer.endOffsets(), and the LRO of a partition, which will be stored in an in-memory "cache" and updated during the task's producer callback.

The proposed changes involve the following steps:

{anchor: chageStepschangeSteps}
  1. LRO Tracking Mechanism:

    • Introduce a new in-memory "cache" to store the LRO for each topic-partition.
    • Update the producer callback logic to keep this LRO cache up-to-date.
  2. LEO Acquisition during Poll Loop: 

    • Introduce a periodic poll mechanism to keep a fresh set of end offsets for all partitions
      • An alternative is to call Consumer.endOffsets() at each poll, which would be inefficient and expensive.
  3. Calculate the replication offset lag as LEO - LRO 
  4. Expose Replication Offset Lag Metric:

    • Introduce a new MirrorMaker metric, named "replication_offset_lag", which represents the calculated replication offset lag.

...