...
Page properties |
---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
1) Add a new metric group scope(SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR) in the MetricOptions.
Code Block |
---|
|
@PublicEvolving
public class MetricOptions {
......
/**
* The scope format string that is applied to all metrics scoped to an operator coordinator on a
* JobManager.
*/
public static final ConfigOption<String> SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR =
key("metrics.scope.jm-job.operator-coordinator")
.stringType()
.defaultValue("<host>.jobmanager.<job_name>.<operator_name>.coordinator")
.withDescription(
"Defines the scope format string that is applied to all metrics scoped to anthe components operatorabout coordinatorOperator on a JobManager, like OperatorCoordinator.");
......
} |
Proposed Changes
1) Define the scope format JobManagerOperatorScopeFormat to the package org.apache.flink.runtime.metrics.scope as the scope format of JobManagerOperatorMetricGroup. 2) Provide common metrics for the OperatorCoordinatorMetricGroup.
Code Block |
---|
|
/**
* Special The scope format for the {@link MetricGroup} representing an Operator coordinator.
*
* <p>You should only update the metrics in the coordinator thread.
*/
@PublicEvolving
public interface OperatorCoordinatorMetricGroup extends MetricGroup {
/**
* The total number of events received since the operator coordinator started.
*/
Counter getNumEventsInCounter();
/**
* The total number of events sent by the operator coordinator.
*/
Counter getNumEventsOutCounter();
} |
Proposed Changes
1) Define a new scope format for the OperatorCoordinatorMetricGroup.
Add the scope format OperatorCoordinatorScopeFormat to the package org.apache.flink.runtime.metrics.scope as the scope format of OperatorCoordinatorMetricGroup.
Code Block |
---|
|
public class OperatorCoordinatorScopeFormat extends ScopeFormat {
public OperatorCoordinatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class JobManagerOperatorScopeFormat extends ScopeFormat {
public JobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
super(
format,
parentFormat,
super(
new String[] {
format,
parentFormatSCOPE_HOST,
new String[] {
SCOPE_JOB_ID,
SCOPE_HOST, SCOPE_JOB_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME
});
}
SCOPE_TASK_VERTEX_ID,
public String[] formatScope(
JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {
SCOPE_TASK_NAME,
final String[] template = copyTemplate();
final String[] values = { SCOPE_OPERATOR_ID,
parent.parent().hostname(),
valueOrNull(parent.jobId()),
SCOPE_OPERATOR_NAME
valueOrNull(parent.jobName()),
});
}
public String[] valueOrNullformatScope(operatorID),
valueOrNull(operatorName) JobManagerJobMetricGroup parent,
};
return bindVariables(template, values);
AbstractID vertexId,
}
} |
2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.
Code Block |
---|
|
public final class ScopeFormats {
......
String taskName,
private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;
privateOperatorID ScopeFormats(operatorID,
String jobManagerFormat, operatorName) {
final String[] template String jobManagerJobFormat,= copyTemplate();
final String[] values = String taskManagerFormat,{
String taskManagerJobFormatparent.parent().hostname(),
String taskFormatvalueOrNull(parent.jobId()),
String operatorFormatvalueOrNull(parent.jobName()),
String operatorCoordinatorFormat) {
valueOrNull(vertexId),
......
this.operatorCoordinatorScopeFormat =valueOrNull(taskName),
valueOrNull(operatorID),
new OperatorCoordinatorScopeFormat(
valueOrNull(operatorName)
};
return operatorCoordinatorFormatbindVariables(template, this.jobManagerJobFormatvalues);
}
} |
2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.
Code Block |
---|
|
public final class ScopeFormats
public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
......
private final returnJobManagerOperatorScopeFormat operatorCoordinatorScopeFormatjmOperatorFormat;
}
} |
3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat.
Code Block |
---|
|
public class InternalOperatorCoordinatorMetricGroupprivate ScopeFormats(
extends ComponentMetricGroup<JobManagerJobMetricGroup>
String jobManagerFormat,
implements OperatorCoordinatorMetricGroup {
private final String operatorName;jobManagerJobFormat,
private final OperatorID operatorID;
publicString InternalOperatorCoordinatorMetricGroup(taskManagerFormat,
MetricRegistryString registrytaskManagerJobFormat,
JobManagerJobMetricGroupString parenttaskFormat,
OperatorIDString operatorIDoperatorFormat,
String operatorNamejmOperatorFormat) {
super(
......
registry,
this.jmOperatorFormat = new JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
}
} |
3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.
Code Block |
---|
|
public class JobManagerOperatorMetricGroup extends registry.getScopeFormats()
.getOperatorCoordinatorScopeFormat()
ComponentMetricGroup<JobManagerJobMetricGroup> {
private final AbstractID vertexId;
private final String taskName;
private final String operatorName;
private final OperatorID operatorID;
public JobManagerOperatorMetricGroup(
MetricRegistry registry,
JobManagerJobMetricGroup parent,
AbstractID vertexId,
String taskName,
OperatorID operatorID,
String operatorName) {
super(
registry,
registry.getScopeFormats()
.getJmOperatorFormat()
.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
parent);
this.vertexId = parent);
vertexId;
this.taskName = taskName;
this.operatorID = operatorID;
this.operatorName = operatorName;
}
@Override
protected String getGroupName(CharacterFilter filter) this.operatorName = operatorName;
}
......
} |
4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.
{
return "operator";
}
@Override
protected QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
return new QueryScopeInfo.JobQueryScopeInfo(this.parent.jobId.toString());
}
@Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
return Collections.emptyList();
}
public AbstractID getVertexId() {
return vertexId;
}
public String getTaskName() {
return taskName;
}
public String getOperatorName() {
return operatorName;
}
public OperatorID getOperatorID() {
return operatorID;
}
} |
4) Modify the JobManagerJobMetricGroup. JobManagerOperatorMetricGroup could be registered and got from JobManagerJobMetricGroup.
Code Block |
---|
|
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();
public JobManagerOperatorMetricGroup getOrAddOperator (
|
Code Block |
---|
|
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
new HashMap<>();
public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
OperatorID operatorID, String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
//AbstractID operatorsvertexId,
final String key = operatorID + truncatedOperatorName;
String taskName,
synchronized (this) {
OperatorID operatorID,
return operatorCoordinators.computeIfAbsent(
String operatorName) {
final String truncatedOperatorName = getTruncatedOperatorName(operatorName);
key,
// unique OperatorIDs only exist in streaming, so we have to rely on the name operatorfor ->batch
// operators
final String key = operatorID + truncatedOperatorName;
new InternalOperatorCoordinatorMetricGroup(
synchronized (this) {
return operators.computeIfAbsent(
this.registry, this, operatorIDkey,
truncatedOperatorName));
}
}
private String getTruncatedOperatorName(String operatorName)operator {->
if (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
new LOG.warnJobManagerOperatorMetricGroup(
"The operator name {} exceeded the {} characters length limit and was truncatedthis."registry,
this, vertexId, taskName, operatorID, truncatedOperatorName));
}
}
operatorName, @Override
protected Iterable<? extends ComponentMetricGroup> subComponents() {
METRICS_OPERATOR_NAME_MAX_LENGTHreturn operators.values();
}
return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
......
} |
5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.
Code Block |
---|
|
@Internal
public class InternalOperatorCoordinatorMetricGroup
} else {
extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {
public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup return operatorName;parent) {
}super(parent.addGroup("coordinator"));
}
......
} |
56) Introduce metricGroup()
method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.
Code Block |
---|
|
@Internal
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
interface Context {
/** Gets the metric group of the operator coordinator. */
OperatorCoordinatorMetricGroup metricGroup();
....
}
} |
6) The implemetation implementation of the OperatorCoordinator registers and reports metrics by the OperatorCoordinator.ContextOperatorCoordinatorMetricGroup.
For example, the SourceCoordinator is an implementation of the interface OperatorCoordinator. It could use the metric group like this:
Code Block |
---|
|
@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
implements OperatorCoordinator {
......
private final Counter numEventsInCounter;
public SourceCoordinator(
String operatorName,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
WatermarkAlignmentParams watermarkAlignmentParams,
@Nullable String coordinatorListeningID) {
......
WatermarkAlignmentParams watermarkAlignmentParams,
this.numEventsInCounter = context.metricGroup().getNumEventsInCounter();
......
}
@Nullable String coordinatorListeningID) @Override{
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
......
this.numEventsInCounter = numEventsInCounter.inc(context.metricGroup().counter("numEventsIn");
......
}
} |
The OperatorCoordinatorMetricGroup could be passed to other components in OperatorCoordinator, like SplitEnumerator in SourceCoordinator. Then a new metric group or metrics could be added to the OperatorCoordinatorMetricGroupSourceCoordinator has a field of SplitEnumerator named 'enumerator'. If we want to register a metric for the enumerator, we could get the metric group from its SplitEnumeratorContext#metricGroup. The details about SplitEnumeratorMetricGroup will be introducing in the issue
Jira |
---|
server | ASF JIRA |
---|
serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-21000 |
---|
|
.
For example,KafkaSourceEnumerator could register a metric on how many partitions has been assigned to source readers.
Code Block |
---|
|
@Internal
public class KafkaSourceEnumerator
implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
public KafkaSourceEnumerator(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetInitializer,
OffsetsInitializer stoppingOffsetInitializer,
Properties properties,
SplitEnumeratorContext<KafkaPartitionSplit> context,
Boundedness boundedness,
Set<TopicPartition> assignedPartitions) {
......
context.metricGroup().gauge("assignedPartitionsNum", assignedPartitions::size);
}
......
} |
...