...
Page properties |
---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
1) Add a new metric group scope(SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR) in the MetricOptions.
Code Block |
---|
|
@PublicEvolving
public class MetricOptions {
......
/**
* The scope format string that is applied to all metrics scoped to an operator coordinator on a
* JobManager.
*/
public static final ConfigOption<String> SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR =
key("metrics.scope.jm-job.operator-coordinator")
.stringType()
.defaultValue("<host>.jobmanager.<job_name>.<operator_name>.coordinator")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to anthe components operatorabout coordinatorOperator on a JobManager, like OperatorCoordinator.");
......
} |
Proposed Changes
1) Define the scope format JobManagerOperatorScopeFormat to the package org.apache.flink.runtime.metrics.scope as the scope format of JobManagerOperatorMetricGroup. 2) Provide common metrics for the OperatorCoordinatorMetricGroup.
Code Block |
---|
|
/**
*The Specialscope {@linkformat MetricGroup}for representingthe an Operator coordinator.
*
* <p>You should only update the metrics in the coordinator thread.
*/
@PublicEvolving
public interface OperatorCoordinatorMetricGroup extends MetricGroup{@link org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class JobManagerOperatorScopeFormat extends ScopeFormat {
public JobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
/**
* The total numbersuper(
of events received since the operator coordinator started.
*/
Counter getNumEventsInCounter();
/**
* The total number of events sent by the operator coordinator.
*/
Counter getNumEventsOutCounter();
} |
Proposed Changes
1) Define a new scope format for the OperatorCoordinatorMetricGroup.
Add the scope format OperatorCoordinatorScopeFormat to the package org.apache.flink.runtime.metrics.scope as the scope format of OperatorCoordinatorMetricGroup.
Code Block |
---|
|
public class OperatorCoordinatorScopeFormat extends ScopeFormat {
public OperatorCoordinatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat)format,
parentFormat,
new String[] {
super(
formatSCOPE_HOST,
parentFormatSCOPE_JOB_ID,
new String[] { SCOPE_JOB_NAME,
SCOPE_HOST, SCOPE_JOBTASK_VERTEX_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME
});
}
SCOPE_TASK_NAME,
public String[] formatScope(
JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {
SCOPE_OPERATOR_ID,
final String[] template = copyTemplate();
final String[] values = {SCOPE_OPERATOR_NAME
parent.parent().hostname(), });
}
public valueOrNull(parent.jobId()),String[] formatScope(
valueOrNull(parent.jobName())JobManagerJobMetricGroup parent,
AbstractID valueOrNull(operatorID)vertexId,
valueOrNull(operatorName)
String taskName,
};
OperatorID operatorID,
return bindVariables(template, values);
String operatorName) {
}
} |
2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.
Code Block |
---|
|
public final class ScopeFormats {final String[] template = copyTemplate();
......
private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;
private ScopeFormats(String[] values = {
String jobManagerFormat,parent.parent().hostname(),
String jobManagerJobFormatvalueOrNull(parent.jobId()),
String taskManagerFormatvalueOrNull(parent.jobName()),
String taskManagerJobFormatvalueOrNull(vertexId),
String taskFormatvalueOrNull(taskName),
String operatorFormatvalueOrNull(operatorID),
String operatorCoordinatorFormat) {valueOrNull(operatorName)
......
};
return bindVariables(template, values);
}
} |
2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.
Code Block |
---|
|
public final class ScopeFormats {
......
this.operatorCoordinatorScopeFormat =
private new OperatorCoordinatorScopeFormatfinal JobManagerOperatorScopeFormat jmOperatorFormat;
private ScopeFormats(
String jobManagerFormat,
operatorCoordinatorFormat,String this.jobManagerJobFormat);,
}
public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
String taskManagerFormat,
return operatorCoordinatorScopeFormat;
String taskManagerJobFormat,
}
} |
3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat.
Code Block |
---|
|
public class InternalOperatorCoordinatorMetricGroup
String taskFormat,
extends ComponentMetricGroup<JobManagerJobMetricGroup>
implements OperatorCoordinatorMetricGroupString {
operatorFormat,
private final String operatorName;
private finalString OperatorID operatorID;
jmOperatorFormat) {
public InternalOperatorCoordinatorMetricGroup(
......
this.jmOperatorFormat = MetricRegistrynew registry,JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
}
} |
3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.
Code Block |
---|
|
public class JobManagerOperatorMetricGroup extends ComponentMetricGroup<JobManagerJobMetricGroup> {
private final JobManagerJobMetricGroup parent,
OperatorID operatorID,
AbstractID vertexId;
private final String taskName;
private final String operatorName;
private final OperatorID operatorID;
public JobManagerOperatorMetricGroup(
MetricRegistry registry,
JobManagerJobMetricGroup parent,
AbstractID vertexId,
String taskName,
OperatorID operatorID,
String operatorName) {
super(
registry,
registry.getScopeFormats()
.getOperatorCoordinatorScopeFormat()
.formatScope(checkNotNull(parent) super(
registry,
registry.getScopeFormats()
.getJmOperatorFormat()
.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
parent);
this.vertexId = vertexId;
this.taskName = taskName;
this.operatorID = operatorID;
this.operatorName = operatorName;
}
@Override
protected String getGroupName(CharacterFilter filter) {
return "operator";
}
@Override
protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
return new QueryScopeInfo.JobQueryScopeInfo(this.parent....
} |
4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.
jobId.toString());
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
}
public AbstractID getVertexId() {
return vertexId;
}
public String getTaskName() {
return taskName;
}
public String getOperatorName() {
return operatorName;
}
public OperatorID getOperatorID() {
return operatorID;
}
} |
4) Modify the JobManagerJobMetricGroup. JobManagerOperatorMetricGroup could be registered and got from JobManagerJobMetricGroup.
Code Block |
---|
|
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();
public JobManagerOperatorMetricGroup getOrAddOperator (
AbstractID vertexId,
String taskName,
|
Code Block |
---|
|
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
new HashMap<>();
public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
// operators
final StringOperatorID keyoperatorID,
= operatorID + truncatedOperatorName;
synchronizedString (thisoperatorName) {
final String truncatedOperatorName = return operatorCoordinators.computeIfAbsent(getTruncatedOperatorName(operatorName);
// unique OperatorIDs only exist in streaming, so we have to rely key,
on the name for batch
// operators
operator ->
final String key = operatorID + truncatedOperatorName;
synchronized (this) {
newreturn InternalOperatorCoordinatorMetricGroupoperators.computeIfAbsent(
key,
this.registry, this, operatorID, truncatedOperatorName));operator ->
}
}
private String getTruncatedOperatorName(String operatorName) {
ifnew JobManagerOperatorMetricGroup(operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
LOG.warn(
this.registry, this, vertexId, taskName, operatorID, truncatedOperatorName));
"The operator name {} exceeded the {}
characters length limit and}
was truncated.", @Override
protected Iterable<? extends ComponentMetricGroup> operatorName,subComponents() {
METRICS_OPERATOR_NAME_MAX_LENGTHreturn operators.values();
}
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
......
} |
5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.
Code Block |
---|
|
@Internal
public class InternalOperatorCoordinatorMetricGroup
} else {
extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {
public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup return operatorName;parent) {
}super(parent.addGroup("coordinator"));
}
......
} |
56) Introduce metricGroup()
method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.
Code Block |
---|
|
@Internal
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
interface Context {
/** Gets the metric group of the operator coordinator. */
OperatorCoordinatorMetricGroup metricGroup();
....
}
} |
6) The implemetation implementation of the OperatorCoordinator registers and reports metrics by the OperatorCoordinator.ContextOperatorCoordinatorMetricGroup.
For example, the SourceCoordinator is an implementation of the interface OperatorCoordinator. It could use the metric group like this:
Code Block |
---|
|
@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
implements OperatorCoordinator {
......
private final Counter numEventsInCounter;
public SourceCoordinator(
String operatorName,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
WatermarkAlignmentParams watermarkAlignmentParams,
@Nullable String coordinatorListeningID) {
......
WatermarkAlignmentParams watermarkAlignmentParams,
this.numEventsInCounter = context.metricGroup().getNumEventsInCounter();
......
}
@Nullable String coordinatorListeningID) @Override{
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
......
this.numEventsInCounter = numEventsInCounter.inc(context.metricGroup().counter("numEventsIn");
......
}
} |
The OperatorCoordinatorMetricGroup could be passed to other components in OperatorCoordinator, like SplitEnumerator in SourceCoordinator. Then a new metric group or metrics could be added to the OperatorCoordinatorMetricGroupSourceCoordinator has a field of SplitEnumerator named 'enumerator'. If we want to register a metric for the enumerator, we could get the metric group from its SplitEnumeratorContext#metricGroup. The details about SplitEnumeratorMetricGroup will be introducing in the issue
Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-21000 |
---|
|
.
For example,KafkaSourceEnumerator could register a metric on how many partitions has been assigned to source readers.
Code Block |
---|
|
@Internal
public class KafkaSourceEnumerator
implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
public KafkaSourceEnumerator(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetInitializer,
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
Boundedness boundedness,
Set<TopicPartition> assignedPartitions) {
......
context.metricGroup().gauge("assignedPartitionsNum", assignedPartitions::size);
}
......
} |
...