Discussion threadhttps://lists.apache.org/thread/xyldlpft7p6sqn65t4qwr5rx778fxgvz
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink Autoscaling with the Kubernetes Operator relies on a set of metrics to scale up and down the sources. The algorithm currently works well with the Kafka sources since it provides metrics that track backlog and the maximum source parallelism. 

Users also need efficient autoscaling for other sources like Iceberg. However, pendingRecords is currently only reported by the SourceReader and doesn’t cover the case for sources that only the SourceEnumerator can calculate those metrics e.g. bounded split implementations like Iceberg.

Another challenge is obtaining the maximum parallelism. For the Kafka sources, the autoscaling implementation uses a Kafka specific metric to determine the upper limit for scaling. Flink already provides a metric unassignedSplits that give a signal of the remaining splits to assign but it is missing the number of currently assigned splits to obtain the total splits.

Public Interfaces

This FLIP requires changes to SplitEnumeratorMetricGroup and SourceReaderMetricGroup, adding methods to set two metrics for pendingRecords and assignedSplits


How to determine the maximum number of splits...

numAssigned

numSplitsProcessed

numSplitsProcessed

numSplitsProcessedPerSecond



@PublicEvolving
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {
	<G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGauge);
	
	/** new addition */
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
}
@PublicEvolving
public interface SourceReaderMetricGroup extends OperatorMetricGroup {
	Counter getNumRecordsInErrorsCounter();
	void setPendingBytesGauge(Gauge<Long> pendingBytesGauge);
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);
	
	/** new addition */
	void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge);
}

Proposed Changes

Flink Connector Framework

  1. Add the setters setPendingRecordsGauge and setAssignedSplitsGauge  in the SplitEnumeratorMetricGroup and SourceReaderMetricGroup.
  2. Implement setter API in the InternalSplitEnumeratorMetricGroup.
  3. Implement setter API in the InternalSourceReaderMetricGroup.
  4. Add functionality to SourceReaderBase to report assignedSplitsGauge, this is already possible via getNumberOfCurrentlyAssignedSplits.

Flink Kubernetes Operator

  1. Support retrieving pendingRecords metrics that comes from the enumerator.
  2. Generalize the logic to determine the upper limit for source parallelism.

Flink Connectors

  1. Implement metrics in
    1. FileSource
    2. HybridSource
    3. NumberSequenceSource
    4. DataGeneratorSource
    5. KafkaSource

Compatibility, Deprecation, and Migration Plan

Just like the other standardized connector metrics, these are opt-in and will not cause any backward incompatible behavior as such. Autoscaling will still work for sources without these metrics by relying on the busyness metrics and will not work as efficiently until they opt-in to reporting these metrics.

We will only implement this for FLIP 27 sources.

Test Plan

Unit tests in Core Flink and Flink Kubernetes Operator repos.

Rejected Alternatives

  1. Unify where the pendingRecords metric is reported (enumerator vs reader).

    For unbounded splits like Kafka/Pulsar partitions, it would be difficult to calculate these outside of the readers--thus requiring additional RPCs to communicate this to the enumerator if we wanted to unify. We would also need to introduce configuration to emit these events periodically, complicating the design.

    For bounded splits like File/Iceberg splits, the individual readers have no context about what remains.

    Thus, I propose to report this metric from enumerator and reader in the way that is easiest for the specific connector and handle both metricGroups in the autoscaling implementation.

  2. Tracking assigned splits in the enumerator
    We need to track this in the SourceReaders since splits can be "completed" and this is only tracked by readers. There are some poll based sources that only take 1 split and poll for another when completed, but we cannot make that assumption in general (i.e. request split when a split is completed). So, this needs to be tracked in the reader.