Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

But there is not a suitable metric group scope for the OperatorCoordinator and not an implementation for the interface OperatorCoordinatorMetricGroup. These metrics in OperatorCoordinator could be how many splits/partitions has been assigned to source readers, how many files has been wrote written out by sink readerswriters, these metrics not only help user to know the job progress but also makes make big job maintaining easier. Thus we propose this FLIP to introduce a new metric group scope for OperatorCoordinator and provide an internal implementation for OperatorCoordinatorMetricGroup.

Public Interfaces

1) Add a new metric group scope(SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR) in the MetricOptions.

Code Block
languagejava
@PublicEvolving
public class MetricOptions {
    ......      

    /**
     * The scope format string that is applied to all metrics scoped to an operator coordinator on a
     * JobManager.
     */
    public static final ConfigOption<String> SCOPE_NAMING_JM_JOB_OPERATOR_COORDINATOR =
            key("metrics.scope.jm-job.operator-coordinator")
                    .stringType()
                    .defaultValue("<host>.jobmanager.<job_name>.<operator_name>.coordinator")
                    .withDescription(
                            "Defines the scope format string that is applied to all metrics scoped to anthe components operatorabout coordinatorOperator on a JobManager, like OperatorCoordinator.");     

    ......
}


Proposed Changes

2) Provide common metrics for the OperatorCoordinatorMetricGroup.1) Define the scope format JobManagerOperatorScopeFormat to the package  org.apache.flink.runtime.metrics.scope as the scope format of  JobManagerOperatorMetricGroup. 

Code Block
languagejava
collapsetrue
/**
 * SpecialThe scope format for the {@link MetricGroup} representing an Operator coordinator.
 *
 * <p>You should only update the metrics in the coordinator thread.
 */
@PublicEvolving
public interface OperatorCoordinatorMetricGroup extends MetricGroup {org.apache.flink.runtime.metrics.groups.JobManagerOperatorMetricGroup}. */
public class JobManagerOperatorScopeFormat extends ScopeFormat {
    public JobManagerOperatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {
        super(
    /**
	 * The total number of events received since the operator coordinator started.format,
	 */
	Counter getNumEventsInCounter();

    /**
	 * The total number of events sent by the operator coordinator.
	 */
	Counter getNumEventsOutCounter(); 
}

Proposed Changes

1) Define a new scope format for the OperatorCoordinatorMetricGroup.

Add the scope format OperatorCoordinatorScopeFormat to the package  org.apache.flink.runtime.metrics.scope as the scope format of  OperatorCoordinatorMetricGroup. 

Code Block
languagejava
public class OperatorCoordinatorScopeFormat extends ScopeFormat {
    public OperatorCoordinatorScopeFormat(String format, JobManagerJobScopeFormat parentFormat) {   parentFormat,
                new String[] {
                        SCOPE_HOST,
        super(
                formatSCOPE_JOB_ID,
                parentFormat,
           SCOPE_JOB_NAME,
     new String[] {
                    SCOPE_HOST, SCOPE_JOBTASK_VERTEX_ID, SCOPE_JOB_NAME, SCOPE_OPERATOR_ID, SCOPE_OPERATOR_NAME

                     });
    }

SCOPE_TASK_NAME,
    public String[] formatScope(
            JobManagerJobMetricGroup parent, OperatorID operatorID, String operatorName) {
 SCOPE_OPERATOR_ID,
        final String[] template = copyTemplate();
        final String[] values = {SCOPE_OPERATOR_NAME
            parent.parent().hostname(),    });
    }

    public    valueOrNull(parent.jobId()),
String[] formatScope(
            JobManagerJobMetricGroup valueOrNull(parent.jobName()),
            valueOrNull(operatorID)AbstractID vertexId,
             valueOrNull(operatorName)String taskName,
        };
    OperatorID operatorID,
    return bindVariables(template, values);
    }
}

2) Modify ScopeFormats to provide the OperatorCoordinatorScopeFormat.

Code Block
languagejava
public final classString ScopeFormatsoperatorName) {

    ......
    private final OperatorCoordinatorScopeFormat operatorCoordinatorScopeFormat;

    private ScopeFormats(String[] template = copyTemplate();
        final String[] values = String jobManagerFormat,{
            String jobManagerJobFormatparent.parent().hostname(),
            String taskManagerFormatvalueOrNull(parent.jobId()),
            String taskManagerJobFormatvalueOrNull(parent.jobName()),
            String taskFormatvalueOrNull(vertexId),
            String operatorFormatvalueOrNull(taskName),
            String operatorCoordinatorFormat) {valueOrNull(operatorID),
        ......
        this.operatorCoordinatorScopeFormat =valueOrNull(operatorName)
        };
        newreturn OperatorCoordinatorScopeFormat(bindVariables(template, values);
    }
}

2) Modify ScopeFormats to provide the JobManagerOperatorScopeFormat.

Code Block
languagejava
collapsetrue
public final class ScopeFormats {
             ......
    private final  operatorCoordinatorFormat, this.jobManagerJobFormat)JobManagerOperatorScopeFormat jmOperatorFormat;

    private }
ScopeFormats(
    public OperatorCoordinatorScopeFormat getOperatorCoordinatorScopeFormat() {
     String jobManagerFormat,
  return operatorCoordinatorScopeFormat;
         }
}

3) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the OperatorCoordinatorScopeFormat.

Code Block
languagejava
public class InternalOperatorCoordinatorMetricGroup String jobManagerJobFormat,
        extends ComponentMetricGroup<JobManagerJobMetricGroup>
   String taskManagerFormat,
    implements OperatorCoordinatorMetricGroup {

    private final String operatorName;taskManagerJobFormat,
    private final  OperatorID operatorID;

    publicString InternalOperatorCoordinatorMetricGroup(taskFormat,
            MetricRegistryString registryoperatorFormat,
            String JobManagerJobMetricGroupjmOperatorFormat) parent,{
            OperatorID operatorID,......
        this.jmOperatorFormat = new  String operatorName) {JobManagerOperatorScopeFormat(jmOperatorFormat, this.jobManagerJobFormat);
        super(
                registry,
                registry.getScopeFormats()
                        .getOperatorCoordinatorScopeFormat()
                        .formatScope(checkNotNull(parent)}
}

3) Add JobManagerOperatorMetricGroup and use the scope format JobManagerOperatorScopeFormat.

Code Block
languagejava
collapsetrue
public class JobManagerOperatorMetricGroup extends ComponentMetricGroup<JobManagerJobMetricGroup> {
	private final AbstractID vertexId;
	private final String taskName;
	private final String operatorName;
	private final OperatorID operatorID;

	public JobManagerOperatorMetricGroup(
			MetricRegistry registry,
			JobManagerJobMetricGroup parent,
			AbstractID vertexId,
			String taskName,
			OperatorID operatorID,
			String operatorName) {
		super(
				registry,
				registry.getScopeFormats()
						.getJmOperatorFormat()
						.formatScope(checkNotNull(parent), vertexId, taskName, operatorID, operatorName),
                				parent);
		this.vertexId        = vertexId;
		this.taskName = taskName;
		this.operatorID = operatorID;
        		this.operatorName = operatorName;
	}

	@Override
	protected String getGroupName(CharacterFilter filter) {
		return "operator";
	}

	@Override
	protected    ......
}

4) Modify the JobManagerJobMetricGroup. InternalOperatorCoordinatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

QueryScopeInfo createQueryServiceMetricInfo(CharacterFilter filter) {
		return new QueryScopeInfo.JobQueryScopeInfo(this.parent.jobId.toString());
	}

	@Override
	protected Iterable<? extends ComponentMetricGroup> subComponents() {
		return Collections.emptyList();
	}

	public AbstractID getVertexId() {
		return vertexId;
	}

	public String getTaskName() {
		return taskName;
	}

	public String getOperatorName() {
		return operatorName;
	}

	public OperatorID getOperatorID() {
		return operatorID;
	}
}

4) Modify the JobManagerJobMetricGroup. JobManagerOperatorMetricGroup could be registered and got from JobManagerJobMetricGroup.

Code Block
languagejava
collapsetrue
@Internal
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, JobManagerOperatorMetricGroup> operators = new HashMap<>();

    public JobManagerOperatorMetricGroup getOrAddOperator (
    
Code Block
languagejava
public class JobManagerJobMetricGroup extends JobMetricGroup<JobManagerMetricGroup> {
    private final Map<String, InternalOperatorCoordinatorMetricGroup> operatorCoordinators =
            new HashMap<>();

    public InternalOperatorCoordinatorMetricGroup getOrAddOperatorCoordinator(
            OperatorID operatorID, String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

        // unique OperatorIDs only exist in streaming, so we have to rely on the name for batch
        //AbstractID operatorsvertexId,
        final String key = operatorID + truncatedOperatorName;

String taskName,
        synchronized (this) {
  OperatorID operatorID,
         return operatorCoordinators.computeIfAbsent(
  String operatorName) {
        final String truncatedOperatorName = getTruncatedOperatorName(operatorName);

    key,
    // unique OperatorIDs only exist in streaming, so  we have to rely on the name operatorfor ->batch
        // operators
        final String key = operatorID + truncatedOperatorName;

     new InternalOperatorCoordinatorMetricGroup(
  synchronized (this) {
            return operators.computeIfAbsent(
                   this.registry, this, operatorIDkey,
 truncatedOperatorName));
        }
    }

    private String getTruncatedOperatorName(String operatorName)operator {->
        if  (operatorName != null && operatorName.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
           new LOG.warnJobManagerOperatorMetricGroup(
                        "The operator name {} exceeded the {} characters length limit and was truncatedthis."registry,
 this, vertexId, taskName, operatorID, truncatedOperatorName));
        }
    }

    operatorName, @Override
    protected Iterable<? extends ComponentMetricGroup> subComponents() {
           METRICS_OPERATOR_NAME_MAX_LENGTHreturn operators.values();
      }     

   return operatorName.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
   ......
}

5) Add an internal implementation InternalOperatorCoordinatorMetricGroup of the interface OperatorCoordinatorMetricGroup and use the JobManagerOperatorMetricGroup as its parent metric group.

Code Block
languagejava
collapsetrue
@Internal
public class InternalOperatorCoordinatorMetricGroup
      } else {
extends ProxyMetricGroup<MetricGroup> implements OperatorCoordinatorMetricGroup {

    public InternalOperatorCoordinatorMetricGroup(JobManagerOperatorMetricGroup   return operatorName;parent) {
        }super(parent.addGroup("coordinator"));
    }
    ......
}

56) Introduce metricGroup() method for OperatorCoordinator.Context. OperatorCoordinator could get its metric group from this method.

Code Block
languagejava
collapsetrue
@Internal
public interface OperatorCoordinator extends CheckpointListener, AutoCloseable {
    interface Context {
        /** Gets the metric group of the operator coordinator. */
        OperatorCoordinatorMetricGroup metricGroup();
        
        ....
    }
}

6) The implemetation implementation of the OperatorCoordinator registers and reports metrics by the OperatorCoordinator.ContextOperatorCoordinatorMetricGroup.


For example, the SourceCoordinator is an implementation of the interface OperatorCoordinator. It could use the metric group like this:

Code Block
languagejava
collapsetrue
@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
        implements OperatorCoordinator {
    ......
    private final Counter numEventsInCounter;

    public SourceCoordinator(
            String operatorName,
            Source<?, SplitT, EnumChkT> source,
            SourceCoordinatorContext<SplitT> context,
            CoordinatorStore coordinatorStore,
            WatermarkAlignmentParams watermarkAlignmentParams,
            @Nullable String coordinatorListeningID) {
        ......
      WatermarkAlignmentParams watermarkAlignmentParams,
 this.numEventsInCounter = context.metricGroup().getNumEventsInCounter();
    	......
    }

 @Nullable String coordinatorListeningID) @Override{
    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
   ......
        this.numEventsInCounter = numEventsInCounter.inc(context.metricGroup().counter("numEventsIn");
    		......
	    }
}

The OperatorCoordinatorMetricGroup could be passed to other components in OperatorCoordinator, like SplitEnumerator in SourceCoordinator. Then a new metric group or metrics could be added to the OperatorCoordinatorMetricGroupSourceCoordinator has a field of SplitEnumerator named 'enumerator'. If we want to register a metric for the enumerator, we could get the metric group from its SplitEnumeratorContext#metricGroup. The details about SplitEnumeratorMetricGroup will be introducing in the issue

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-21000
.

For example,KafkaSourceEnumerator could register a metric on how many partitions has been assigned to source readers.

Code Block
languagejava
collapsetrue
@Internal
public class KafkaSourceEnumerator
        implements SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState> {
    public KafkaSourceEnumerator(
            KafkaSubscriber subscriber,
            OffsetsInitializer startingOffsetInitializer,
            OffsetsInitializer stoppingOffsetInitializer,
            Properties properties,
            SplitEnumeratorContext<KafkaPartitionSplit> context,
            Boundedness boundedness,
            Set<TopicPartition> assignedPartitions) {
        ......
        context.metricGroup().gauge("assignedPartitionsNum", assignedPartitions::size);
    }
    
    ......
}

...

This POC provide the new metric group scope and common metric names for the OperatorCoordinator. And It provides the internal implementation for the OperatorCoordinatorMetricGroup and SplitEnumeratorMetricGroup. Besides these, the POC modify SourceCoordinator and KafkaSourceEnumerator to show how to use the metric group.

...