You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

This page is meant as a template for writing a KIP. To create a KIP choose Tools->Copy on this page and modify with your content and replace the heading with the next KIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: KAFKA-14112

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current Kafka architecture lacks a built-in mechanism to directly track the replication offset lag, i.e the number of to-be-replicated records. This could be essential for monitoring and maintaining the health and performance of data replication processes. The replication offset lag, defined as the difference between the last end offset of the source partition (LEO) and the last replicated source offset (LRO), is beneficial for understanding the progress and potential bottlenecks in data replication scenarios.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • org/apache/kafka/connect/mirror/MirrorMetrics - new metrics templates for max, min, avg and value replication-offset-lag

    MirrorMetrics.java
     replicationOffsetLag = new MetricNameTemplate(
    
     "replication-offset-lag", SOURCE_CONNECTOR_GROUP,
    
     "Count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagMax = new MetricNameTemplate(
    
     "replication-offset-lag-max", SOURCE_CONNECTOR_GROUP,
    
     "Max count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagMin = new MetricNameTemplate(
    
     "replication-offset-lag-min", SOURCE_CONNECTOR_GROUP,
    
     "Min count of records to be replicated from source to target cluster.", partitionTags);
    
     replicationOffsetLagAvg = new MetricNameTemplate(
    
     "replication-offset-lag-avg", SOURCE_CONNECTOR_GROUP,
    
     "Average count of records to be replicated from source to target cluster.", partitionTags)

Proposed Changes

This proposal aims to enhance Kafka's monitoring capabilities by introducing a new metric to track the replication offset lag for a given topic-partition. The metric will be calculated by taking the difference between the LEO, which will be fetched during the source task's poll loop, and the LRO, which will be stored in an in-memory "cache" and updated during the task's producer callback.

The proposed changes involve the following steps:

  1. LRO Tracking Mechanism:

    • Introduce a new in-memory "cache" to store the LRO for each topic-partition.
    • Update the producer callback logic to keep this LRO cache up-to-date.
  2. LEO Acquisition during Poll Loop:

    • Modify the source task's poll loop to retrieve the LEO from the consumer.
    • Calculate the replication offset lag as LEO - LRO 
  3. Expose Replication Offset Lag Metric:

    • Introduce a new MirrorMaker metric, named "replication_offset_lag", which represents the calculated replication offset lag.

Compatibility, Deprecation, and Migration Plan

  • This proposal ensures backward compatibility by introducing new metrics and modifying existing poll loop and callback mechanisms we do not impact existing Kafka features, APIs, or configurations.

Test Plan

Describe in few sentences how the KIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels