...
org/apache/kafka/connect/mirror/MirrorMetrics - new metrics templates for
max
,min
,avg
andvalue
ofreplication-record-lag
Code Block language java title MirrorMetrics.java replicationRecordLag = new MetricNameTemplate( "replication-record-lag", SOURCE_CONNECTOR_GROUP, "Count of records to be replicated from source to target cluster.", partitionTags); replicationRecordLagMax = new MetricNameTemplate( "replication-record-lag-max", SOURCE_CONNECTOR_GROUP, "Max count of records to be replicated from source to target cluster.", partitionTags); replicationRecordLagMin = new MetricNameTemplate( "replication-record-lag-min", SOURCE_CONNECTOR_GROUP, "Min count of records to be replicated from source to target cluster.", partitionTags); replicationRecordLagAvg = new MetricNameTemplate( "replication-record-lag-avg", SOURCE_CONNECTOR_GROUP, "Average count of records to be replicated from source to target cluster.", partitionTags)
- org/apache/kafka/connect/mirror/MirrorSourceTaskConfig - a configuration to control the poll interval for the Consumer.endOffsets() call at LEO acquisition mentioned below.
- Proposed name: replication.record.lag.metric.refresh.interval
- Proposed default:15 seconds
- org/apache/kafka/connect/mirror/MirrorSourceTaskConfig - a configuration to control the time-to-live property of a stale partition LRO in order to avoid the in-memory LRO "cache" from causing memory overhead.
- Proposed name: replication.record.lag.metric.last.replicated.offset.ttl
- Proposed default: 20 seconds
Proposed Changes
This proposal aims to enhance Kafka's monitoring capabilities by introducing a new metric to track the replication record lag for a given topic-partition. The metric will be calculated by taking the difference between the LEO of a partition, which will be periodically updated by calling Consumer.endOffsets()
, and the LRO of a partition, which will be stored in an in-memory "cache" and updated during the task's producer callback.
...