Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

1) Add a new metric group scope(SCOPE_NAMING_JM_OPERATOR_COORDINATOR) in the MetricOptions.

Code Block
languagejava
@PublicEvolving
public class MetricOptions {
    ......      

    /**
     * The scope format string that is applied to all metrics scoped to an operator coordinator on a
     * JobManager.
     */
    public static final ConfigOption<String> SCOPE_NAMING_JM_OPERATOR_COORDINATOR =
            key("metrics.scope.jm-operator-coordinator")
                    .stringType()
                    .defaultValue("<host>.jobmanager.<job_name>.<task_name>.<operator_name>.coordinator")
                    .withDescription(
                            "Defines the scope format string that is applied to all metrics scoped to an OperatorCoordinatorthe components about Operator on a JobManager, like OperatorCoordinator.");
     

    ......
}


Proposed Changes

1) Define the Define the scope format OperatorCoordinatorScopeFormat JobManagerOperatorScopeFormat to the package  org.apache.flink.runtime.metrics.scope as the scope format of  OperatorCoordinatorMetricGroupof  JobManagerOperatorMetricGroup

Code Block
languagejava
collapsetrue
/** The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class OperatorCoordinatorScopeFormatJobManagerOperatorScopeFormat extends ScopeFormat {
    public OperatorCoordinatorScopeFormatJobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
        super(
                format,
                parentFormat,
                new String[] {
                        SCOPE_HOST,
 SCOPE_JOB_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME
                });
    }
SCOPE_JOB_ID,
    public String[] formatScope(
            JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {
 SCOPE_JOB_NAME,
        final String[] template = copyTemplate();
        final String[] values = { SCOPE_TASK_VERTEX_ID,
            parent.parent().hostname(),
            valueOrNull(parent.jobId()),
SCOPE_TASK_NAME,
               valueOrNull(parent.jobName())         SCOPE_OPERATOR_ID,
            valueOrNull(operatorID),
            valueOrNull(operatorName)
SCOPE_OPERATOR_NAME
                });
    }

    return bindVariables(template, values);public String[] formatScope(
    }
}

2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.

Code Block
languagejava
collapsetrue
public final class ScopeFormats {
         ......
JobManagerJobMetricGroup parent,
     private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;

    privateAbstractID ScopeFormats(vertexId,
            String jobManagerFormattaskName,
            StringOperatorID jobManagerJobFormatoperatorID,
            String taskManagerFormat, operatorName) {

        final String[] template  String taskManagerJobFormat,= copyTemplate();
        final String[] values = String taskFormat,{
            String operatorFormatparent.parent().hostname(),
            String operatorCoordinatorFormat) {
valueOrNull(parent.jobId()),
         ......   valueOrNull(parent.jobName()),
        this.operatorCoordinatorScopeFormat =
   valueOrNull(vertexId),
             new OperatorCoordinatorScopeFormat(valueOrNull(taskName),
            valueOrNull(operatorID),
            operatorCoordinatorFormat, this.jobManagerJobFormat);
valueOrNull(operatorName)
        };

    public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
 return       return operatorCoordinatorScopeFormatbindVariables(template, values);
    }
}

3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.

Code Block
languagejava
collapsetrue
@Internal
public final class InternalOperatorCoordinatorMetricGroupScopeFormats {
    ......
    extendsprivate ComponentMetricGroup<JobManagerJobMetricGroup>
final JobManagerOperatorScopeFormat jmOperatorFormat;

    private ScopeFormats(
   implements  OperatorCoordinatorMetricGroup {

    private final String operatorName;jobManagerFormat,
    private  final OperatorID operatorID;

    publicString InternalOperatorCoordinatorMetricGroup(jobManagerJobFormat,
            MetricRegistryString registrytaskManagerFormat,
            JobManagerJobMetricGroupString parenttaskManagerJobFormat,
            OperatorIDString operatorIDtaskFormat,
            String operatorName) {operatorFormat,
        super(
    String            registry,jmOperatorFormat) {
                registry.getScopeFormats()......
        this.jmOperatorFormat = new              .getOperatorCoordinatorScopeFormat()JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
    }
}

3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.

Code Block
languagejava
collapsetrue
public class JobManagerOperatorMetricGroup extends ComponentMetricGroup<JobManagerJobMetricGroup> {
	private final AbstractID vertexId;
	private final String taskName;
	private final String operatorName;
	private final     .formatScope(checkNotNull(parent), operatorID, operatorName),
                parent);
        OperatorID operatorID;

	public JobManagerOperatorMetricGroup(
			MetricRegistry registry,
			JobManagerJobMetricGroup parent,
			AbstractID vertexId,
			String taskName,
			OperatorID operatorID,
			String operatorName) {
		super(
				registry,
				registry.getScopeFormats()
						.getJmOperatorFormat()
						.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
				parent);
		this.vertexId = vertexId;
		this.taskName = taskName;
		this.operatorID = operatorID;
        		this.operatorName = operatorName;
	}

	@Override
	protected String getGroupName(CharacterFilter filter) {
		return "operator";
	}

	@Override
	protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
		return new QueryScopeInfo.JobQueryScopeInfo(this.parent....
}

4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

jobId.toString());
	}

	@Override
	protected Iterable<? extends ComponentMetricGroup> subComponents() {
		return Collections.emptyList();
	}

	public AbstractID getVertexId() {
		return vertexId;
	}

	public String getTaskName() {
		return taskName;
	}

	public String getOperatorName() {
		return operatorName;
	}

	public OperatorID getOperatorID() {
		return operatorID;
	}
}

4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

Code Block
languagejava
collapsetrue
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();

    public JobManagerOperatorMetricGroup getOrAddOperator (
    
Code Block
languagejava
collapsetrue
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
            new HashMap<>();

    public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
            OperatorID operatorID, String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

        // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
        //AbstractID operatorsvertexId,
        final String key = operatorID + truncatedOperatorName;
String taskName,
        synchronized (this) {
  OperatorID operatorID,
         return operatorCoordinators.computeIfAbsent(
  String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

    key,
    // unique OperatorIDs only exist in streaming, so we  have to rely on the name operatorfor ->batch
        // operators
        final String key = operatorID + truncatedOperatorName;

     new InternalOperatorCoordinatorMetricGroup(
  synchronized (this) {
            return operators.computeIfAbsent(
                   this.registry, thiskey, operatorID, truncatedOperatorName));

         }
    }

    private String getTruncatedOperatorName(String operatorName)operator {->
        if  (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
           new LOG.warnJobManagerOperatorMetricGroup(
                    "The operator name {} exceeded the {} characters length limit and was truncated.",
                     this.registry, this, vertexId, taskName, operatorID, truncatedOperatorName));
        }
    }

    operatorName, @Override
    protected Iterable<? extends ComponentMetricGroup> subComponents() {
           METRICS_OPERATOR_NAME_MAX_LENGTHreturn operators.values();
     }     

    return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
  ......
}

5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.

Code Block
languagejava
collapsetrue
@Internal
public class InternalOperatorCoordinatorMetricGroup
      } else {
extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {

    public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup   return operatorName;parent) {
        }super(parent.addGroup("coordinator"));
    }
    ......
}

56) Introduce metricGroup() method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.

...