Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

This FLIP is initially proposed by MengYue(272614347@qq.com), and finished by Hang Ruan(ruanhang1993@hotmail.com).

MengYue has provided a design in FLINK-29801, which will use the JobManagerJobMetricGroup to add a new metric group for OperatorCoordinator. Based on that PR, this FLIP tries to provide a generic design for OperatorCoordinator metric group.

Motivation

OperatorCoordinator is the coordinator for runtime operators and running on Job Manager. The coordination mechanism is operator events between OperatorCoordinator and its all operators, the coordination is more and more using in Flink for example many Sources and Sinks depend on the mechanism to assign splits and coordinate commits to external systems. The OperatorCoordinator is widely using in flink kafka connector, flink pulsar connector, flink cdc connector,flink hudi connector and so on. 

But there is not a suitable metric group scope for the OperatorCoordinator and not an implementation for the interface OperatorCoordinatorMetricGroup. These metrics in OperatorCoordinator could be how many splits/partitions has been assigned to source readers, how many files has been written out by sink writers, these metrics not only help user to know the job progress but also make big job maintaining easier. Thus we propose this FLIP to introduce a new metric group scope for OperatorCoordinator and provide an internal implementation for OperatorCoordinatorMetricGroup.

Public Interfaces

1) Add a new metric group scope(SCOPE_NAMING_JM_OPERATOR) in the MetricOptions.

@PublicEvolving
public class MetricOptions {
    ......      

    /**
     * The scope format string that is applied to all metrics scoped to an operator coordinator on a
     * JobManager.
     */
    public static final ConfigOption<String> SCOPE_NAMING_JM_OPERATOR =
            key("metrics.scope.jm-operator")
                    .stringType()
                    .defaultValue("<host>.jobmanager.<job_name>.<operator_name>")
                    .withDescription(
                            "Defines the scope format string that is applied to all metrics scoped to the components about Operator on a JobManager, like OperatorCoordinator.");     

    ......
}


Proposed Changes

1) Define the scope format JobManagerOperatorScopeFormat to the package  org.apache.flink.runtime.metrics.scope as the scope format of  JobManagerOperatorMetricGroup. 

/** The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class JobManagerOperatorScopeFormat extends ScopeFormat {
    public JobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
        super(
                format,
                parentFormat,
                new String[] {
                        SCOPE_HOST,
                        SCOPE_JOB_ID,
                        SCOPE_JOB_NAME,
                        SCOPE_TASK_VERTEX_ID,
                        SCOPE_TASK_NAME,
                        SCOPE_OPERATOR_ID,
                        SCOPE_OPERATOR_NAME
                });
    }

    public String[] formatScope(
            JobManagerJobMetricGroup parent,
            AbstractID vertexId,
            String taskName,
            OperatorID operatorID,
            String operatorName) {

        final String[] template = copyTemplate();
        final String[] values = {
            parent.parent().hostname(),
            valueOrNull(parent.jobId()),
            valueOrNull(parent.jobName()),
            valueOrNull(vertexId),
            valueOrNull(taskName),
            valueOrNull(operatorID),
            valueOrNull(operatorName)
        };
        return bindVariables(template, values);
    }
}

2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.

public final class ScopeFormats {
    ......
    private final JobManagerOperatorScopeFormat jmOperatorFormat;

    private ScopeFormats(
            String jobManagerFormat,
            String jobManagerJobFormat,
            String taskManagerFormat,
            String taskManagerJobFormat,
            String taskFormat,
            String operatorFormat,
            String jmOperatorFormat) {
        ......
        this.jmOperatorFormat = new JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
    }
}

3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.

public class JobManagerOperatorMetricGroup extends ComponentMetricGroup<JobManagerJobMetricGroup> {
	private final AbstractID vertexId;
	private final String taskName;
	private final String operatorName;
	private final OperatorID operatorID;

	public JobManagerOperatorMetricGroup(
			MetricRegistry registry,
			JobManagerJobMetricGroup parent,
			AbstractID vertexId,
			String taskName,
			OperatorID operatorID,
			String operatorName) {
		super(
				registry,
				registry.getScopeFormats()
						.getJmOperatorFormat()
						.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
				parent);
		this.vertexId = vertexId;
		this.taskName = taskName;
		this.operatorID = operatorID;
		this.operatorName = operatorName;
	}

	@Override
	protected String getGroupName(CharacterFilter filter) {
		return "operator";
	}

	@Override
	protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
		return new QueryScopeInfo.JobQueryScopeInfo(this.parent.jobId.toString());
	}

	@Override
	protected Iterable<? extends ComponentMetricGroup> subComponents() {
		return Collections.emptyList();
	}

	public AbstractID getVertexId() {
		return vertexId;
	}

	public String getTaskName() {
		return taskName;
	}

	public String getOperatorName() {
		return operatorName;
	}

	public OperatorID getOperatorID() {
		return operatorID;
	}
}

4) Modify the JobManagerJobMetricGroup. JobManagerOperatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();

    public JobManagerOperatorMetricGroup getOrAddOperator (
            AbstractID vertexId,
            String taskName,
            OperatorID operatorID,
            String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

        // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
        // operators
        final String key = operatorID + truncatedOperatorName;

        synchronized (this) {
            return operators.computeIfAbsent(
                    key,
                    operator ->
                            new JobManagerOperatorMetricGroup(
                                    this.registry, this, vertexId, taskName, operatorID, truncatedOperatorName));
        }
    }

    @Override
    protected Iterable<? extends ComponentMetricGroup> subComponents() {
        return operators.values();
    }     

    ......
}

5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.

@Internal
public class InternalOperatorCoordinatorMetricGroup
        extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {

    public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup parent) {
        super(parent.addGroup("coordinator"));
    }
}

6) Introduce metricGroup() method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.

@Internal
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
    interface Context {
        /** Gets the metric group of the operator coordinator. */
        OperatorCoordinatorMetricGroup metricGroup();
        
        ....
    }
}

6) The implementation of the OperatorCoordinator registers and reports metrics by OperatorCoordinatorMetricGroup.


For example, the SourceCoordinator is an implementation of the interface OperatorCoordinator. It could use the metric group like this:

@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
        implements OperatorCoordinator {
    ......
    private final Counter numEventsInCounter;

    public SourceCoordinator(
            String operatorName,
            Source<?, SplitT, EnumChkT> source,
            SourceCoordinatorContext<SplitT> context,
            CoordinatorStore coordinatorStore,
            WatermarkAlignmentParams watermarkAlignmentParams,
            @Nullable String coordinatorListeningID) {
        ......
        this.numEventsInCounter = context.metricGroup().counter("numEventsIn");
    	......
    }
}

The SourceCoordinator has a field of SplitEnumerator named 'enumerator'. If we want to register a metric for the enumerator, we could get the metric group from its SplitEnumeratorContext#metricGroup. The details about SplitEnumeratorMetricGroup will be introducing in the issue Unable to render Jira issues macro, execution error. .

For example,KafkaSourceEnumerator could register a metric on how many partitions has been assigned to source readers.

@Internal
public class KafkaSourceEnumerator
        implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions) {
        ......
        context.metricGroup().gauge("assignedPartitionsNum", assignedPartitions::size);
    }
    
    ......
}

Compatibility, Deprecation, and Migration Plan

This FLIP does not need consider this part.

Test Plan

We will provide unit tests and IT tests to validate the proposed changes.

Rejected Alternatives

Use JobManagerJobMetricGroup in OperatorCoordinator

This alternative will pass the JobManagerJobMetricGroup to the OperatorCoordinator. OperatorCoordinator could use it to add a new metric group by own. The design could be found here.

This alternative has the following disadvantages:

1) Every OperatorCoordinator need add a new metric group. This is a common operation for OperatorCoordinator, and should be done by the Flink framework.

2) Every implementation of OperatorCoordinator will provide a scope name by itself. It is difficult to manage the OperatorCoordinator metric groups, and is hard for users to understand.

POC

We‘ve made a basic POC for the proposed change, you can know more POC information as following.

https://github.com/ruanhang1993/flink/commit/2d373fe78cc7197d5f8e459e1645e9b3d54b3e92

This POC provide the new metric group scope and common metric names for the OperatorCoordinator. And It provides the internal implementation for the OperatorCoordinatorMetricGroup and SplitEnumeratorMetricGroup. Besides these, the POC modify SourceCoordinator and KafkaSourceEnumerator to show how to use the metric group.