Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink Autoscaling with the Kubernetes Operator relies on a set of metrics to scale up and down the sources. The algorithm currently works well with the Kafka sources since it provides metrics that track backlog and the maximum source parallelism. 

...

Another challenge is obtaining the maximum parallelism. For the Kafka sources, the autoscaling implementation uses a Kafka specific metric to determine the upper limit for scaling. Flink already provides a metric unassignedSplits that give a signal of the remaining splits to assign but it is missing the number of currently assigned splits to obtain the total splits.

Public Interfaces

This FLIP requires changes to SplitEnumeratorMetricGroup, adding methods to set two metrics for pendingRecords and assignedSplits

...

Code Block
@PublicEvolving
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {

	<G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGauge);
	
	/** new addition */
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);

	/** new addition */  
 	void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge); 
}

Proposed Changes

Core Flink

  1. Add the setters setPendingRecordsGauge and setAssignedSplitsGauge  in the SplitEnumeratorMetricGroup.
  2. Implement those setters in the InternalSplitEnumeratorMetricGroup.

...

  1. Support retrieving pendingRecords metrics that comes from the enumerator.
  2. Generalize the logic to determine the upper limit for source parallelism.

Compatibility, Deprecation, and Migration Plan

Just like the other standardized connector metrics, these are opt-in and will not cause any backward incompatible behavior as such. Autoscaling will still work for sources without these metrics by relying on the busyness metrics and will not work as efficiently until they opt-in to reporting these metrics.

Test Plan

Unit tests in Core Flink and Flink Kubernetes Operator repos.

Rejected Alternatives

  1. Unify where the pendingRecords metric is reported (enumerator vs reader).

...