...
Code Block |
---|
@PublicEvolving public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup { <G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGauge); /** new addition */ void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge); } |
Code Block |
---|
@PublicEvolving public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup { Counter getNumRecordsInErrorsCounter(); void setPendingBytesGauge(Gauge<Long> pendingBytesGauge); void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge); /** new addition */ void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge); } |
Proposed Changes
Core Flink
- Add the setters
setPendingRecordsGauge
andsetAssignedSplitsGauge
in theSplitEnumeratorMetricGroup
. - Implement those setters setter API in the InternalSplitEnumeratorMetricGroup.
- Implement setter API in the InternalSplitEnumeratorMetricGroup
Flink Kubernetes Operator
- Support retrieving pendingRecords metrics that comes from the enumerator.
- Generalize the logic to determine the upper limit for source parallelism.
Flink Core Connectors
- Implement metrics in
- FileSource
- HybridSource
- NumberSequenceSource
- DataGeneratorSource
- HiveSource
- KafkaSource
Compatibility, Deprecation, and Migration Plan
...
- Unify where the pendingRecords metric is reported (enumerator vs reader).
For unbounded splits like Kafka/Pulsar partitions, it would be difficult to calculate these outside of the readers--thus requiring additional RPCs to communicate this to the enumerator if we wanted to unify. We would also need to introduce configuration to emit these events periodically, complicating the design.
For bounded splits like File/Iceberg splits, the individual readers have no context about what remains.
Thus, I propose to report this metric from enumerator and reader in the way that is easiest for the specific connector and handle both metricGroups in the autoscaling implementation.