...
This FLIP requires changes to SplitEnumeratorMetricGroup and SourceReaderMetricGroup
, adding methods to set two metrics for pendingRecords
and assignedSplits
.
...
Code Block |
---|
@PublicEvolving public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup { Counter getNumRecordsInErrorsCounter(); void setPendingBytesGauge(Gauge<Long> pendingBytesGauge); void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge); /** new addition */ void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge); } |
Proposed Changes
Core Flink Connector Framework
- Add the setters
setPendingRecordsGauge
andsetAssignedSplitsGauge
in theSplitEnumeratorMetricGroup and SourceReaderMetricGroup
. - Implement setter API in the InternalSplitEnumeratorMetricGroup.
- Implement setter API in the InternalSplitEnumeratorMetricGroupInternalSourceReaderMetricGroup.
- Add functionality to SourceReaderBase to report assignedSplitsGauge, this is already possible via
getNumberOfCurrentlyAssignedSplits
.
Flink Kubernetes Operator
- Support retrieving pendingRecords metrics that comes from the enumerator.
- Generalize the logic to determine the upper limit for source parallelism.
Flink Core Connectors
- Implement metrics in
- FileSource
- HybridSource
- NumberSequenceSource
- DataGeneratorSource
- HiveSource
- KafkaSource
...
Just like the other standardized connector metrics, these are opt-in and will not cause any backward incompatible behavior as such. Autoscaling will still work for sources without these metrics by relying on the busyness metrics and will not work as efficiently until they opt-in to reporting these metrics.
We will only implement this for FLIP 27 sources.
Test Plan
Unit tests in Core Flink and Flink Kubernetes Operator repos.
...