Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
@PublicEvolving
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {

	<G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGauge);
	
	/** new addition */
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
}


Code Block
@PublicEvolving
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {
	Counter getNumRecordsInErrorsCounter();
	void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
	
	/** new addition */  
 	void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge); 
}

Proposed Changes

Core Flink

  1. Add the setters setPendingRecordsGauge and setAssignedSplitsGauge  in the SplitEnumeratorMetricGroup.
  2. Implement those setters setter API in the InternalSplitEnumeratorMetricGroup.
  3. Implement setter API in the InternalSplitEnumeratorMetricGroup

Flink Kubernetes Operator

  1. Support retrieving pendingRecords metrics that comes from the enumerator.
  2. Generalize the logic to determine the upper limit for source parallelism.

Flink Core Connectors

  1. Implement metrics in
    1. FileSource
    2. HybridSource
    3. NumberSequenceSource
    4. DataGeneratorSource
    5. HiveSource
    6. KafkaSource

Compatibility, Deprecation, and Migration Plan

...

  1. Unify where the pendingRecords metric is reported (enumerator vs reader).

    For unbounded splits like Kafka/Pulsar partitions, it would be difficult to calculate these outside of the readers--thus requiring additional RPCs to communicate this to the enumerator if we wanted to unify. We would also need to introduce configuration to emit these events periodically, complicating the design.

    For bounded splits like File/Iceberg splits, the individual readers have no context about what remains.

    Thus, I propose to report this metric from enumerator and reader in the way that is easiest for the specific connector and handle both metricGroups in the autoscaling implementation.