...
org/apache/kafka/connect/mirror/MirrorMetrics - new metrics templates for max, min, avg and value replication-offset-lag
Code Block language java title MirrorMetrics.java replicationOffsetLag = new MetricNameTemplate( "replication-offset-lag", SOURCE_CONNECTOR_GROUP, "Count of records to be replicated from source to target cluster.", partitionTags); replicationOffsetLagMax = new MetricNameTemplate( "replication-offset-lag-max", SOURCE_CONNECTOR_GROUP, "Max count of records to be replicated from source to target cluster.", partitionTags); replicationOffsetLagMin = new MetricNameTemplate( "replication-offset-lag-min", SOURCE_CONNECTOR_GROUP, "Min count of records to be replicated from source to target cluster.", partitionTags); replicationOffsetLagAvg = new MetricNameTemplate( "replication-offset-lag-avg", SOURCE_CONNECTOR_GROUP, "Average count of records to be replicated from source to target cluster.", partitionTags)
- (Optional) - MirrorConnectorConfig - a configuration to control the poll interval for the Consumer.endOffsets() call at LEO acquisition mentioned below.
Proposed Changes
This proposal aims to enhance Kafka's monitoring capabilities by introducing a new metric to track the replication offset lag for a given topic-partition. The metric will be calculated by taking the difference between the LEO of a partition, which will be fetched during the source task's poll loopperiodically updated by calling Consumer.endOffsets(), and the LRO of a partition, which will be stored in an in-memory "cache" and updated during the task's producer callback.
The proposed changes involve the following steps:
{anchor: ChangeSteps}
LRO Tracking Mechanism:
- Introduce a new in-memory "cache" to store the LRO for each topic-partition.
- Update the producer callback logic to keep this LRO cache up-to-date.
LEO Acquisition during Poll Loop:
- Introduce a periodic poll mechanism to keep a fresh set of end offsets for all partitions
- An alternative is to call Consumer.endOffsets() at each poll, which would be inefficient and expensive
- .
- Introduce a periodic poll mechanism to keep a fresh set of end offsets for all partitions
- Calculate the replication offset lag as
LEO - LRO
Expose Replication Offset Lag Metric:
- Introduce a new MirrorMaker metric, named "replication_offset_lag", which represents the calculated replication offset lag.
...
Metrics will be tested in org.apache.kafka.connect.mirror.MirrorSourceTaskTest in a unit test fashion. Unfortunately there are no system tests that I could find where this change could be tested, but I am happy to do it if there is a suitable place where it is already done and that I have missed, or to implement a new system test.
Rejected Alternatives
It might be argued that the existing replication-latency-ms
metric already provides satisfactory information about the replication lag.
replication-latency-ms
is calculated in the producer callback: it is the difference between the system time at the moment of callback invocation and the timestamp of the original source record.
...