Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

...

Flink Autoscaling with the Kubernetes Operator relies on a set of metrics to scale up and down the sources. The algorithm currently works well with the Kafka sources since it provides metrics that track backlog and the maximum source parallelism. 

Users also need efficient autoscaling for other sources like Iceberg. However, pendingRecords is currently only reported by the SourceReader and doesn’t cover the case for sources that only the SourceEnumerator can calculate those metrics e.g. bounded split implementations like Iceberg.

Another challenge is obtaining the maximum parallelism. For the Kafka sources, the autoscaling implementation uses a Kafka specific metric to determine the upper limit for scaling. Flink already provides a metric unassignedSplits that give a signal of the remaining splits to assign but it is missing the number of currently assigned splits to obtain the total splits.

Public Interfaces

This FLIP requires changes to SplitEnumeratorMetricGroup, adding methods to set two metrics for pendingRecords and assignedSplits


Code Block
@PublicEvolving
public interface SplitEnumeratorMetricGroup extends OperatorCoordinatorMetricGroup {

	<G extends Gauge<Long>> G setUnassignedSplitsGauge(G unassignedSplitsGauge);
	
	/** new addition */
	void setPendingRecordsGauge(Gauge<Long> pendingRecordsGauge);

	/** new addition */  
 	void setAssignedSplitsGauge(Gauge<Long> assignedSplitsGauge); 
}

Proposed Changes

Core Flink

  1. Add the setters setPendingRecordsGauge and setAssignedSplitsGauge  in the SplitEnumeratorMetricGroup.
  2. Implement those setters in the InternalSplitEnumeratorMetricGroup.

Flink Kubernetes Operator

  1. Support retrieving pendingRecords metrics that comes from the enumerator.
  2. Generalize the logic to determine the upper limit for source parallelism.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

Just like the other standardized connector metrics, these are opt-in and will not cause any backward incompatible behavior as such. Autoscaling will still work for sources without these metrics by relying on the busyness metrics and will not work as efficiently until they opt-in to reporting these metrics.

Test Plan

Unit tests in Core Flink and Flink Kubernetes Operator repos.

Rejected Alternatives

  1. Unify where the pendingRecords metric is reported (enumerator vs reader).

For unbounded splits like Kafka/Pulsar partitions, it would be difficult to calculate these outside of the readers--thus requiring additional RPCs to communicate this to the enumerator if we wanted to unify. We would also need to introduce configuration to emit these events periodically, complicating the design.

For bounded splits like File/Iceberg splits, the individual readers have no context about what remains.

Thus, I propose to report this metric from enumerator and reader in the way that is easiest for the specific connector and handle both metricGroups in the autoscaling implementationIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.