Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP requires changes to SplitEnumeratorMetricGroup and SourceReaderMetricGroup, adding methods to set two metrics for pendingRecords and assignedSplits

...

Code Block
@PublicEvolving
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {
	Counter getNumRecordsInErrorsCounter();
	void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
	
	/** new addition */
	void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge);
}

Proposed Changes

Core Flink Connector Framework

  1. Add the setters setPendingRecordsGauge and setAssignedSplitsGauge  in the SplitEnumeratorMetricGroup and SourceReaderMetricGroup.
  2. Implement setter API in the InternalSplitEnumeratorMetricGroup.
  3. Implement setter API in the InternalSplitEnumeratorMetricGroupInternalSourceReaderMetricGroup.
  4. Add functionality to SourceReaderBase to report assignedSplitsGauge, this is already possible via getNumberOfCurrentlyAssignedSplits.

Flink Kubernetes Operator

  1. Support retrieving pendingRecords metrics that comes from the enumerator.
  2. Generalize the logic to determine the upper limit for source parallelism.

Flink Core Connectors

  1. Implement metrics in
    1. FileSource
    2. HybridSource
    3. NumberSequenceSource
    4. DataGeneratorSource
    5. HiveSource
    6. KafkaSource

...

Just like the other standardized connector metrics, these are opt-in and will not cause any backward incompatible behavior as such. Autoscaling will still work for sources without these metrics by relying on the busyness metrics and will not work as efficiently until they opt-in to reporting these metrics.

We will only implement this for FLIP 27 sources.

Test Plan

Unit tests in Core Flink and Flink Kubernetes Operator repos.

...