...
1) Add a new metric group scope(SCOPE_NAMING_JM_OPERATOR_COORDINATOR) in the MetricOptions.
Code Block |
---|
|
@PublicEvolving
public class MetricOptions {
......
/**
* The scope format string that is applied to all metrics scoped to an operator coordinator on a
* JobManager.
*/
public static final ConfigOption<String> SCOPE_NAMING_JM_OPERATOR_COORDINATOR =
key("metrics.scope.jm-operator-coordinator")
.stringType()
.defaultValue("<host>.jobmanager.<job_name>.<task_name>.<operator_name>.coordinator")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to an OperatorCoordinatorthe components about Operator on a JobManager, like OperatorCoordinator.");
......
} |
Proposed Changes
1) Define the Define the scope format OperatorCoordinatorScopeFormat JobManagerOperatorScopeFormat to the package org.apache.flink.runtime.metrics.scope as the scope format of OperatorCoordinatorMetricGroupof JobManagerOperatorMetricGroup.
Code Block |
---|
|
/** The scope format for the {@link org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class OperatorCoordinatorScopeFormatJobManagerOperatorScopeFormat extends ScopeFormat {
public OperatorCoordinatorScopeFormatJobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
super(
format,
parentFormat,
new String[] {
SCOPE_HOST,
SCOPE_JOB_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME
});
}
SCOPE_JOB_ID,
public String[] formatScope(
JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {
SCOPE_JOB_NAME,
final String[] template = copyTemplate();
final String[] values = { SCOPE_TASK_VERTEX_ID,
parent.parent().hostname(),
valueOrNull(parent.jobId()),
SCOPE_TASK_NAME,
valueOrNull(parent.jobName()) SCOPE_OPERATOR_ID,
valueOrNull(operatorID),
valueOrNull(operatorName)
SCOPE_OPERATOR_NAME
});
}
return bindVariables(template, values);public String[] formatScope(
}
} |
2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.
Code Block |
---|
|
public final class ScopeFormats {
......
JobManagerJobMetricGroup parent,
private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;
privateAbstractID ScopeFormats(vertexId,
String jobManagerFormattaskName,
StringOperatorID jobManagerJobFormatoperatorID,
String taskManagerFormat, operatorName) {
final String[] template String taskManagerJobFormat,= copyTemplate();
final String[] values = String taskFormat,{
String operatorFormatparent.parent().hostname(),
String operatorCoordinatorFormat) {
valueOrNull(parent.jobId()),
...... valueOrNull(parent.jobName()),
this.operatorCoordinatorScopeFormat =
valueOrNull(vertexId),
new OperatorCoordinatorScopeFormat(valueOrNull(taskName),
valueOrNull(operatorID),
operatorCoordinatorFormat, this.jobManagerJobFormat);
valueOrNull(operatorName)
};
public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
return return operatorCoordinatorScopeFormatbindVariables(template, values);
}
} |
3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.
Code Block |
---|
|
@Internal
public final class InternalOperatorCoordinatorMetricGroupScopeFormats {
......
extendsprivate ComponentMetricGroup<JobManagerJobMetricGroup>
final JobManagerOperatorScopeFormat jmOperatorFormat;
private ScopeFormats(
implements OperatorCoordinatorMetricGroup {
private final String operatorName;jobManagerFormat,
private final OperatorID operatorID;
publicString InternalOperatorCoordinatorMetricGroup(jobManagerJobFormat,
MetricRegistryString registrytaskManagerFormat,
JobManagerJobMetricGroupString parenttaskManagerJobFormat,
OperatorIDString operatorIDtaskFormat,
String operatorName) {operatorFormat,
super(
String registry,jmOperatorFormat) {
registry.getScopeFormats()......
this.jmOperatorFormat = new .getOperatorCoordinatorScopeFormat()JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
}
} |
3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.
Code Block |
---|
|
public class JobManagerOperatorMetricGroup extends ComponentMetricGroup<JobManagerJobMetricGroup> {
private final AbstractID vertexId;
private final String taskName;
private final String operatorName;
private final .formatScope(checkNotNull(parent), operatorID, operatorName),
parent);
OperatorID operatorID;
public JobManagerOperatorMetricGroup(
MetricRegistry registry,
JobManagerJobMetricGroup parent,
AbstractID vertexId,
String taskName,
OperatorID operatorID,
String operatorName) {
super(
registry,
registry.getScopeFormats()
.getJmOperatorFormat()
.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
parent);
this.vertexId = vertexId;
this.taskName = taskName;
this.operatorID = operatorID;
this.operatorName = operatorName;
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "operator";
}
@Override
protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
return new QueryScopeInfo.JobQueryScopeInfo(this.parent....
} |
4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.
jobId.toString());
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
}
public AbstractID getVertexId() {
return vertexId;
}
public String getTaskName() {
return taskName;
}
public String getOperatorName() {
return operatorName;
}
public OperatorID getOperatorID() {
return operatorID;
}
} |
4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.
Code Block |
---|
|
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();
public JobManagerOperatorMetricGroup getOrAddOperator (
|
Code Block |
---|
|
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
new HashMap<>();
public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
//AbstractID operatorsvertexId,
final String key = operatorID + truncatedOperatorName;
String taskName,
synchronized (this) {
OperatorID operatorID,
return operatorCoordinators.computeIfAbsent(
String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
key,
// unique OperatorIDs only exist in streaming, so we have to rely on the name operatorfor ->batch
// operators
final String key = operatorID + truncatedOperatorName;
new InternalOperatorCoordinatorMetricGroup(
synchronized (this) {
return operators.computeIfAbsent(
this.registry, thiskey, operatorID, truncatedOperatorName));
}
}
private String getTruncatedOperatorName(String operatorName)operator {->
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
new LOG.warnJobManagerOperatorMetricGroup(
"The operator name {} exceeded the {} characters length limit and was truncated.",
this.registry, this, vertexId, taskName, operatorID, truncatedOperatorName));
}
}
operatorName, @Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
METRICS_OPERATOR_NAME_MAX_LENGTHreturn operators.values();
}
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
......
} |
5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.
Code Block |
---|
|
@Internal
public class InternalOperatorCoordinatorMetricGroup
} else {
extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {
public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup return operatorName;parent) {
}super(parent.addGroup("coordinator"));
}
......
} |
56) Introduce metricGroup()
method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.
...