You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread
Vote thread
JIRAhttps://issues.apache.org/jira/browse/FLINK-29801
Release1.17

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

OperatorCoordinator is the coordinator for runtime operators and running on Job Manager. The coordination mechanism is operator events between OperatorCoordinator and its all operators, the coordination is more and more using in Flink for example many Sources and Sinks depend on the mechanism to assign splits and coordinate commits to external systems. The OperatorCoordinator is widely using in flink kafka connector, flink pulsar connector, flink cdc connector,flink hudi connector and so on. 

But there is not a suitable metric group for the OperatorCoordinator to report its own metrics, these metrics could be how many splits/partitions has been assigned to source readers, how many files has been wrote out by sink readers, these metrics not only help user to know the job progress but also makes big job maintaining easier.Thus we propose this FLIP to introduce metric group for OperatorCoordinator.

Public Interfaces

1) Add a new metric scope(SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR) in the MetricOptions.

@PublicEvolving
public class MetricOptions {
    ......

    /**
     * The scope format string that is applied to all metrics scoped to an operator coordinator on a
     * JobManager.
     */
    public static final ConfigOption<String> SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR =
            key("metrics.scope.jm-job.operator-coordinator")
                    .stringType()
                    .defaultValue("<host>.jobmanager.<job_name>.<operator_name>.coordinator")
                    .withDescription(
                            "Defines the scope format string that is applied to all metrics scoped to an operator coordinator on a JobManager.");
    ......
}

2) Add a new public interface OperatorCoordinatorMetricGroup to the package org.apache.flink.metrics.groups.

/**
 * Special {@link MetricGroup} representing an Operator coordinator.
 *
 * <p>You should only update the metrics in the coordinator thread.
 */
@PublicEvolving
public interface OperatorCoordinatorMetricGroup extends MetricGroup {
    /**
	 * The total number of events received since the operator coordinator started.
	 */
	Counter getNumEventsInCounter();
}


Proposed Changes

1) Define a new scope format for the OperatorCoordinatorMetricGroup.

Add the scope format OperatorCoordinatorScopeFormat to the package  org.apache.flink.runtime.metrics.scope as the scope format of  OperatorCoordinatorMetricGroup. 

public class OperatorCoordinatorScopeFormat extends ScopeFormat {
    public OperatorCoordinatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
        super(
                format,
                parentFormat,
                new String[] {
                    SCOPE_HOST, SCOPE_JOB_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME
                });
    }

    public String[] formatScope(
            JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {

        final String[] template = copyTemplate();
        final String[] values = {
            parent.parent().hostname(),
            valueOrNull(parent.jobId()),
            valueOrNull(parent.jobName()),
            valueOrNull(operatorID),
            valueOrNull(operatorName)
        };
        return bindVariables(template, values);
    }
}

2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.

public final class ScopeFormats {
    ......
    private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;

    private ScopeFormats(
            String jobManagerFormat,
            String jobManagerJobFormat,
            String taskManagerFormat,
            String taskManagerJobFormat,
            String taskFormat,
            String operatorFormat,
            String operatorCoordinatorFormat) {
        ......
        this.operatorCoordinatorScopeFormat =
                new OperatorCoordinatorScopeFormat(
                        operatorCoordinatorFormat, this.jobManagerJobFormat);
    }

    public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
        return operatorCoordinatorScopeFormat;
    }
}

3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat.

public class InternalOperatorCoordinatorMetricGroup
        extends ComponentMetricGroup<JobManagerJobMetricGroup>
        implements OperatorCoordinatorMetricGroup {

    private final String operatorName;
    private final OperatorID operatorID;

    public InternalOperatorCoordinatorMetricGroup(
            MetricRegistry registry,
            JobManagerJobMetricGroup parent,
            OperatorID operatorID,
            String operatorName) {
        super(
                registry,
                registry.getScopeFormats()
                        .getOperatorCoordinatorScopeFormat()
                        .formatScope(checkNotNull(parent), operatorID, operatorName),
                parent);
        this.operatorID = operatorID;
        this.operatorName = operatorName;
    }

    ......
}

4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
            new HashMap<>();

    public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
            OperatorID operatorID, String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

        // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
        // operators
        final String key = operatorID + truncatedOperatorName;

        synchronized (this) {
            return operatorCoordinators.computeIfAbsent(
                    key,
                    operator ->
                            new InternalOperatorCoordinatorMetricGroup(
                                    this.registry, this, operatorID, truncatedOperatorName));
        }
    }

    private String getTruncatedOperatorName(String operatorName) {
        if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
            LOG.warn(
                    "The operator name {} exceeded the {} characters length limit and was truncated.",
                    operatorName,
                    METRICS_OPERATOR_NAME_MAX_LENGTH);
            return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
        } else {
            return operatorName;
        }
    }
    ......
}

5) Introduce metricGroup() method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.

@Internal
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
    interface Context {
        /** Gets the metric group of the operator coordinator. */
        OperatorCoordinatorMetricGroup metricGroup();
        
        ....
    }
}


Compatibility, Deprecation, and Migration Plan

This FLIP does not need consider this part.

Test Plan

We will provide unit tests and IT tests to validate the proposed changes.

Rejected Alternatives

Use JobManagerJobMetricGroup in OperatorCoordinator

This alternative will pass the JobManagerJobMetricGroup to the OperatorCoordinator. OperatorCoordinator could use it to add a new metric group by own.

This alternative has the following disadvantages:

1) Every OperatorCoordinator need add a new metric group. This is a common operation for OperatorCoordinator, and should be done by the Flink framework.

2) Every implementation of OperatorCoordinator will provide a scope name by itself. It is difficult to manage the OperatorCoordinator metric groups, and is hard for users to understand.

POC

We‘ve made a basic POC for the proposed change, you can know more POC information as following.

https://github.com/ruanhang1993/flink/commit/3130f6821195fcc28e1c7a8350f24e4fc3d8461c


  • No labels