Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

1) Add a new metric group scope(SCOPE_NAMING_JM_OPERATOR_COORDINATOR) in the MetricOptions.

Code Block
languagejava
@PublicEvolving
public class MetricOptions {
    ......      

    /**
     * The scope format string that is applied to all metrics scoped to an operator coordinator on a
     * JobManager.
     */
    public static final ConfigOption<String> SCOPE_NAMING_JM_OPERATOR_COORDINATOR =
            key("metrics.scope.jm-operator-coordinator")
                    .stringType()
                    .defaultValue("<host>.jobmanager.<job_name>.<operator_name>.coordinator")
                    .withDescription(
                            "Defines the scope format string that is applied to all metrics scoped to the ancomponents about OperatorCoordinatorOperator on a JobManager, like OperatorCoordinator.");     

    ......
}


Proposed Changes

1) Define the Define the scope format OperatorCoordinatorScopeFormat JobManagerOperatorScopeFormat to the package  org.apache.flink.runtime.metrics.scope as the scope format of  OperatorCoordinatorMetricGroupof  JobManagerOperatorMetricGroup

Code Block
languagejava
collapsetrue
/** The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class OperatorCoordinatorScopeFormatJobManagerOperatorScopeFormat extends ScopeFormat {
    public OperatorCoordinatorScopeFormatJobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
        super(
                format,
                parentFormat,
                new String[] {
                        SCOPE_HOST,
 SCOPE_JOB_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME
                });
    }
SCOPE_JOB_ID,
    public String[] formatScope(
            JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {
 SCOPE_JOB_NAME,
        final String[] template = copyTemplate();
        final String[] values = { SCOPE_TASK_VERTEX_ID,
            parent.parent().hostname(),
            valueOrNull(parent.jobId()),
SCOPE_TASK_NAME,
               valueOrNull(parent.jobName())         SCOPE_OPERATOR_ID,
            valueOrNull(operatorID),
            valueOrNull(operatorName)
SCOPE_OPERATOR_NAME
                });
    }

    return bindVariables(template, values);public String[] formatScope(
    }
}

2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.

Code Block
languagejava
collapsetrue
public final class ScopeFormats {
         ......
JobManagerJobMetricGroup parent,
     private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;

    privateAbstractID ScopeFormats(vertexId,
            String jobManagerFormattaskName,
            StringOperatorID jobManagerJobFormatoperatorID,
            String taskManagerFormat, operatorName) {

        final String[] template  String taskManagerJobFormat,= copyTemplate();
        final String[] values = String taskFormat,{
            String operatorFormatparent.parent().hostname(),
            String operatorCoordinatorFormat) {
valueOrNull(parent.jobId()),
         ......   valueOrNull(parent.jobName()),
        this.operatorCoordinatorScopeFormat =
   valueOrNull(vertexId),
             new OperatorCoordinatorScopeFormat(valueOrNull(taskName),
            valueOrNull(operatorID),
            operatorCoordinatorFormat, this.jobManagerJobFormat);
valueOrNull(operatorName)
        };

    public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
 return       return operatorCoordinatorScopeFormatbindVariables(template, values);
    }
}

3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.

Code Block
languagejava
collapsetrue
@Internal
public final class InternalOperatorCoordinatorMetricGroupScopeFormats {
    ......
    extendsprivate ComponentMetricGroup<JobManagerJobMetricGroup>
final JobManagerOperatorScopeFormat jmOperatorFormat;

    private ScopeFormats(
   implements  OperatorCoordinatorMetricGroup {

    private final String operatorName;jobManagerFormat,
    private  final OperatorID operatorID;

    publicString InternalOperatorCoordinatorMetricGroup(jobManagerJobFormat,
            MetricRegistryString registrytaskManagerFormat,
            JobManagerJobMetricGroupString parenttaskManagerJobFormat,
            OperatorIDString operatorIDtaskFormat,
            String operatorName) {operatorFormat,
        super(
    String            registry,jmOperatorFormat) {
                registry.getScopeFormats()......
        this.jmOperatorFormat = new              .getOperatorCoordinatorScopeFormat()JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
    }
}

3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.

Code Block
languagejava
collapsetrue
public class JobManagerOperatorMetricGroup extends ComponentMetricGroup<JobManagerJobMetricGroup> {
	private final AbstractID vertexId;
	private final String taskName;
	private final String operatorName;
	private final     .formatScope(checkNotNull(parent), operatorID, operatorName),
                parent);
        OperatorID operatorID;

	public JobManagerOperatorMetricGroup(
			MetricRegistry registry,
			JobManagerJobMetricGroup parent,
			AbstractID vertexId,
			String taskName,
			OperatorID operatorID,
			String operatorName) {
		super(
				registry,
				registry.getScopeFormats()
						.getJmOperatorFormat()
						.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
				parent);
		this.vertexId = vertexId;
		this.taskName = taskName;
		this.operatorID = operatorID;
        		this.operatorName = operatorName;
	}

	@Override
	protected String getGroupName(CharacterFilter filter) {
		return "operator";
	}

	@Override
	protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
		return new QueryScopeInfo.JobQueryScopeInfo(this.parent....
}

4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

jobId.toString());
	}

	@Override
	protected Iterable<? extends ComponentMetricGroup> subComponents() {
		return Collections.emptyList();
	}

	public AbstractID getVertexId() {
		return vertexId;
	}

	public String getTaskName() {
		return taskName;
	}

	public String getOperatorName() {
		return operatorName;
	}

	public OperatorID getOperatorID() {
		return operatorID;
	}
}

4) Modify the JobManagerJobMetricGroup. JobManagerOperatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

Code Block
languagejava
collapsetrue
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();

    public JobManagerOperatorMetricGroup getOrAddOperator (
    
Code Block
languagejava
collapsetrue
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
            new HashMap<>();

    public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
            OperatorID operatorID, String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

        // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
        //AbstractID operatorsvertexId,
        final String key = operatorID + truncatedOperatorName;
String taskName,
        synchronized (this) {
  OperatorID operatorID,
         return operatorCoordinators.computeIfAbsent(
  String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

    key,
    // unique OperatorIDs only exist in streaming, so we  have to rely on the name operatorfor ->batch
        // operators
        final String key = operatorID + truncatedOperatorName;

     new InternalOperatorCoordinatorMetricGroup(
  synchronized (this) {
            return operators.computeIfAbsent(
                   this.registry, thiskey, operatorID, truncatedOperatorName));

         }
    }

    private String getTruncatedOperatorName(String operatorName)operator {->
        if  (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
           new LOG.warnJobManagerOperatorMetricGroup(
                    "The operator name {} exceeded the {} characters length limit and was truncated.",
                     this.registry, this, vertexId, taskName, operatorID, truncatedOperatorName));
        }
    }

    operatorName, @Override
    protected Iterable<? extends ComponentMetricGroup> subComponents() {
           METRICS_OPERATOR_NAME_MAX_LENGTHreturn operators.values();
     }     

    return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
  ......
}

5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.

Code Block
languagejava
collapsetrue
@Internal
public class InternalOperatorCoordinatorMetricGroup
      } else {
extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {

    public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup   return operatorName;parent) {
        }super(parent.addGroup("coordinator"));
    }
    ......
}

56) Introduce metricGroup() method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.

...