Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.



Motivation

Currently Paimon runs without monitoring and metrics out of box. In the production environment, users need to know how their Paimon Table behaves like what is the commit duration, how many files each commit added or deleted, the status of compaction operation, the duration of scan query, lag of streaming reading and so on. So we need to introduce a metrics system and more metrics for Paimon.  

...

Code Block
languagejava
public class Metrics {

    /** The metrics reporters container. */
  	private final List<MetricsReporter> reporters = new ArrayList<>();

	/** The metrics group. */  
	private final Map<String, MetricGroup>List<MetricGroup> metricGroups = new HashMap<>ArrayList<>();   

	/** Add a metrics reporter. */
	public void addReporter(MetricsReporter reporter) {};

	/** Add a metric group. */
	public void addGroup(String groupName, MetricGroup group) {};

	/** Get metrics reporters. */
	public List<MetricsReporter> getReporters() {};

	/** Get metric groups. */
	public Map<String, MetricGroup>List<MetricGroup> getMetricGroups() {};
}

MetricGroup

...

Code Block
languagejava
public classinterface MetricGroup {
	private final  String groupName;

	/** Constructor of MetricGroup/** Register gauge metric. */
 	public void MetricGroupgauge(String name, Gauge groupNamegauge) {
		this.groupName = groupName;
	}
	
	/** tagsRegister ofcounter metric group. */
	private final Map<Stringpublic void counter(String name, Set<String>>Counter tags = new HashMap<>();counter) {}

	/** MapRegister ofhistogram gauge metricsmetric. */
	public void   private final Map<String, Gauge<?>> gauges = new HashMap<>();

    /** Map of counter metrics. */
    private final Map<String, Counter> counters = new HashMap<>();

	/** Map of counter metrics. */
    private final Map<String, Histogram> histograms = new HashMap<>();

   /** Register gauge metric. */
 	public void gauge(String name, Gauge gauge) {}

	/** Register counter metric. */
	public void counter(String name, Counter counter) {}

	/** Register histogram metric. */
	public void histogram(String name, Histogram counter) {}

	/** Add tag for metric group. */
	public void addTag(String tag, String value) {}
}histogram(String name, Histogram counter) {}
}

Paimon support to add tags for metric groups, tags is likely to scopes in Flink's metric group. The Paimon support to add tags for metric groups, the tagged metrics group will report metric value with tags as a part of prefix of the metric name. 

with tags. Table partition and bucket can be added as tag for some metric groups. Like the number of files monitoring in committing / scan / compaction, users can monitor the files of different buckets to check the data skew, and by checking the commit compaction duration of different buckets to see which unexpected bucket caused the long commit compaction duration.

MetricsReporter

MetricsReporter  is used to report metrics to external backend, Paimon will implement an out of box reporter as JMX `MetricsReporter` MetricsReporter .

Code Block
languagejava
public interface MetricsReporter {
	/** Configure reporter after instantiating it.*/
     void open();

    /** Closes this reporter. */
    void close();

	/** Report the current measurements. This method is called periodically by the Metrics. */
	void report();
}

...

draw.io Diagram
bordertrue
diagramNameMetricsSystem
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1141
revision56

Metrics Registering

Take CommitMetrics as example, the CommitMetrics will be instantiated by FileStoreCommitImpl, then commit related metrics will be registered by the corresponding MetricGroup in the Metrics instance

...

We introduce CommitMetrics , CompactionMetrics , ScanMetrics as metrics set to measure the stats of Paimon table committing, compaction and scanning.

Common metrics

CommitMetrics

CommitMetrics

Code Block
languagejava
public class CommitMetrics {
	private final String LAST_COMMIT_DURATION_METRIC = "lastCommitDuration";
	...

    private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>();
    private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>();
    private final MetricGroup genericMetricGroup;

	private void registerGenericCommitMetrics() {   
		group.gauge(
Code Block
languagejava
public class CommitMetrics {
	private final String GROUP_NAME = "commitMetricGroup";
	private final String LAST_COMMIT_DURATION_METRIC, = "lastCommitDuration"new CommitDurationGauge());
		...
	private MetricGroup group;}

 	publicprivate void CommitMetricsregisterTaggedCommitMetrics() {
		this.group = new MetricsGroup(GROUP_NAME);
  		 getMetricsInstance().addGroup(GROUP_NAME, group);
	}
	private void registerCommitMetrics() {   
		group.gauge(LAST_COMMIT_DURATION_METRIC, new CommitDurationGauge()if(isNewBucket()) {
			registerTaggedMetrics();
		}

	 	if(isNewBucket()) {
			registerTaggedMetrics();
		...}
	}
}

CommitMetrics list includes commit duration, counter of files / records etc.

Metric Name

Description

Type

Tagged

Unit

Update at

lastCommitDuration

The time it took to complete the last commit.

Gauge

Partition, Bucketnone

Ms

Timer starts before commit starting, update commit duration after the last commit finished.

commitDuration

Distributions of the time taken by the last few commits.

Histogram

Partition, Bucketnone

Ms

Timer starts before commit starting, update commit duration after each commit finished.

lastCommitAttempts

The number of attempts the last commit made.

CounterGauge

Partition, Bucketnone

Number

Increment by 1 when trying to commit once, clear the counter after the last commit finished.

lastTableFilesAdded

Number of added table files in last commit, including newly created data files and compacted after.

Gauge

Partition, Bucket

Number

Collecting changes from committables

lastTableFilesDeleted

Number of deleted table files in last commit, which comes from compacted before.

Gauge

Partition, Bucket

Number

Collecting changes from committables

last commit finished.

lastTableFilesAdded

Number of added

lastTableFilesAppended

Number of appended table files in last commit, which means the including newly created data files and compacted after.

Gauge

Partition, Bucketnone

Number

Collecting changes from committables

lastTableFilesCommitCompatedlastTableFilesDeleted

Number of compacted deleted table files in last commit, including which comes from compacted before and after.

Gauge

Partition, Bucketnone

Number

Collecting changes from committables

lastChangelogFilesAppendedlastTableFilesAppended

Number of appended changelog table files in last commit

Gauge

Partition, Bucket

Number

Collecting changes from committables

lastChangelogFileCommitCompacted

Number of compacted changelog files in last commit

Gauge

, which means the newly created data files.

Gauge

nonePartition, Bucket

Number

Collecting changes from committables

totalTablesFileslastTableFilesCommitCompated

Number of total data files currently maintained on storage.

Counter

compacted table files in last commit, including compacted before and after.

Gauge

nonePartition, Bucket

Number

Collecting changes from committables

totalChangelogFileslastChangelogFilesAppended

Number of total appended changelog files currently maintained on storage.

Counter

in last commit

Gauge

nonePartition, Bucket

Number

Collecting changes from committables

lastGeneratedSnapshotslastChangelogFileCommitCompacted

Number of snapshot compacted changelog files generated in last commit, maybe 1 snapshot or 2 snapshots.

Gauge

Partition, Bucketnone

Number

After collecting Collecting changes from committables

totalSnapshotslastGeneratedSnapshots

Number of currently retained total snapshots.

Counter

none

Number

When trying to commit, the counter will increment by the number of snapshots generated

When expiring snapshots, the counter will decrement by the number of expiring snapshots.

lastTotalRecordsAppended

Total records count in last commit with APPEND commit kindsnapshot files generated in last commit, maybe 1 snapshot or 2 snapshots.

Gauge

none

NumberPreparing snapshot file with APPEND commit kind

After collecting changes from committables

lastDeltaRecordsAppended

Delta records count in last commit with APPEND commit kind

Gauge

Partition, Bucketnone

Number

Preparing snapshot file with APPEND commit kind

lastChangelogRecordsAppended

Changelog records count in last commit with APPEND commit kind

Gauge

Partition, Bucketnone

Number

Preparing snapshot file with APPEND commit kind

lastTotalRecordsCommitCompated

Total records count in last commit with COMPACT commit kind

Gauge

Partition, Bucket

Number

Preparing snapshot file with COMPACT commit kind

lastDeltaRecordsCommitCompated

Delta records count in last commit with COMPACT commit kind

Gauge

Partition, Bucketnone

Number

Preparing snapshot file with COMPACT commit kind

lastChangelogRecordsCommitCompated

Changelog records count in last commit with COMPACT commit kind

Gauge

Partition, Bucketnone

Number

Preparing snapshot file with COMPACT commit kind

lastPartitionsWritten

Number of partitions written in last commit

Gauge

none

Number

After collecting changes from committables

lastBucketsWritten

Number of buckets written in last commit

Gauge

none

Number

After collecting changes from committables

...

Code Block
public class ScanMetrics {
	private final String GROUP_NAME = "scanMetricGroup";
	private final String LAST_SCAN_SKIPPED_MANIFESTS_METRIC = "lastScanSkippedManifests";
	...
 	
	private MetricGroup group;

	public ScanMetrics() {
		this.groupfinal Map<Integer, BucketMetricGroup> bucketMetricGroups = new MetricsGroupHashMap<>(GROUP_NAME);
  		 getMetricsInstance().addGroup(GROUP_NAME, group);
	}
   	
    private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>();

 	private void registerScanMetricsregisterTaggedScanMetrics() {   
		group.counter(LAST_SCAN_SKIPPED_MANIFESTS_METRIC, new LastScanSkippedManifestsGauge()
		if(isNewBucket()) {
			registerTaggedMetrics();
		}

	 	if(isNewPartition()) {
			registerTaggedMetrics();
		...}
	}       
}

ScanMetrics list includes duration, data files and manifest files counter.

Metric Name

Description

Type

Tagged

Unit

Update at

lastScanDuration

The time it took to complete the last scan planning.

Gauge

Partition, Bucketnone

Ms

Timer starts before the scan planning starts, update after the last scan planning operation finished

scanDuration

Distributions of the time taken by the last few scan planning operations.

Histogram

Partition, Bucketnone

Ms

Timer starts before the scan planning starts, update after each planning finished

lastScannedManifests

Number of scanned manifests files in the last scan planning.

Gauge

Partition, Bucketnone

Number

Scan planning

lastScanSkippedManifests

Number of skipped manifests files in the last scan planning.

Gauge

Partition, Bucketnone

Number

Scan planning

lastScanResultTableFiles

Number of result table files in the last scan planning.

Gauge

Partition, Bucket

Number

Scan planning

lastScanGenerateSplits

Number of splits generated by the last scan planning.

Gauge

Partition, Bucketnone

Number

Scan planning

CompactionMetrics

Code Block
public class CompactionMetrics {

 	private final String GROUP_NAME = "compactionMetricGroup";
	private final String LAST_TABLE_FILES_COMPACTED_BEFORE_METRIC = "lastTableFilesCompactedBefore";
 	...
 	
	private MetricGroup group;

	public CompactionMetrics() {
		this.groupfinal Map<Integer, BucketMetricGroup> bucketMetricGroups = new MetricsGroupHashMap<>(GROUP_NAME);
  		 getMetricsInstance().addGroup(GROUP_NAME, group);
	}
   	

    private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>();

 	private void registerScanMetricsregisterTaggedCompactionMetrics() {   
		group.counter(LAST_TABLE_FILES_COMPACTED_BEFORE_METRIC, new LastTableFilesCompactedBeforeGauge()
		if(isNewBucket()) {
			registerTaggedMetrics();
		}

	 	if(isNewPartition()) {
			registerTaggedMetrics();
		...}
	} 
}

CompactionMetrics list includes duration, and counter of files, sorted runs etc.

Metric Name

Description

Type

Tagged

Unit

Update at

lastCompactionDuration

The time it took to complete the last compaction.

Gauge

Partition, Bucketnone

Ms

Timer starts before compaction, update after the last compaction finished

compactionDuration

Distributions of the time taken by the last few compactions.

Histogram

Partition, Bucketnone

Ms

Timer starts before compaction, update after each compaction finished

lastTableFilesCompactedBefore

Number of deleted files in last compaction

Gauge

Partition, Bucketnone

Number

After getting compaction result.

lastTableFilesCompactedAfter

Number of added files in last compaction

Gauge

Partition, Bucketnone

Number

After getting compaction result.

lastChangelogFilesCompacted

Number of changelog files compacted in last compaction

Gauge

Partition, Bucketnone

Number

After getting compaction result.

lastSortedRunslastCompactOutputLevel

Number of total sorted runs of all levels at last compaction

Gauge

Partition, Bucketnone

Number

Updating levels after getting compaction result.

lastLevel0FileslastCompactFileSize

Number of files at level 0 at Sum of file size compacted in last compaction

Gauge

Partition, Bucketnone

Number

Updating levels after getting compaction result.

...