Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.



Motivation

Currently Paimon runs without monitoring and metrics out of box. In the production environment, users need to know how their Paimon Table behaves like what is the commit duration, how many files each commit added or deleted, the status of compaction operation, the duration of scan query, lag of streaming reading and so on. So we need to introduce a metrics system and more metrics for Paimon.  

...

Code Block
/** A Counter is a metric measured by incrementing and decrementing. */
@Public
public interface Counter extends Metric {

    /** Increment the current count by 1. */
    void inc();

    /**
     * Increment the current count by the given value.
     *
     * @param n value to increment the current count by
     */
    void inc(long n);

    /** Decrement the current count by 1. */
    void dec();

    /**
     * Decrement the current count by the given value.
     *
     * @param n value to decrement the current count by
     */
    void dec(long n);

    /**
     * Returns the current count.
     *
     * @return current count
     */
    long getCount();
}

...

Histogram

Class Metrics  is the core of metrics system, there are `MetricRegistry` and `MetricsReporter` container in it. When the Metrics  instance is initiating, the MetricRegistry  is instantiated and metrics reporters are started. 

Metrics reporters are configurable, users can use custom reporters, Paimon will provide a default metrics reporter of JMX metrics reporter. 

Code Block
languagejava
public class Metrics {

    /** The registry that holds the metrics. */
 	private final MetricRegistry registry;

    /** The metrics reporters container. */
  	private final List<MetricsReporter> reporters;

	/** Register metrics to MetricRegistry. 
 		@param name The name of metric.
		@param metric The metric to register.	
	*/
	public void registerMetrics(String name, Metric metric) {}
}

MetricRegistry

MetricRegistry is a class responsible for metrics registering, there is a metrics container in it. It provides register method for each type of measurable metric, registering metrics will put metrics to the metrics container.

Histogram is a type of metric interface measure the statistical distribution of a set of values including the min, max, mean, standard deviation and percentile.

Code Block
/** The histogram allows to record values, get the current count of recorded values and create
 * histogram statistics for the currently seen elements.*/
@Public
public interface Histogram extends Metric {

    /**
     * Update the histogram with the given value.
     *
     * @param value Value to update the histogram with
     */
    void update(long value);

    /**
     * Get the count of seen elements.
     *
     * @return Count of seen elements
    
Code Block
languagejava
public class MetricRegistry {

	/** Map of gauge metrics. */
    private final Map<String, Gauge<?>> gauges = new HashMap<>();

    /** Map of counter metrics. */
    private final Map<String, Counter> counters = new HashMap<>long getCount();

	    /**
 Register  gauge metric. */
 	publicCreate voidstatistics gauge(String name, Gauge gauge) {}

	/** Register counter metric. */
	public void counter(String name, Counter counter) {}
}

MetricsReporter

MetricsReporter  is used to report metrics to external backend, Paimon will implement an out of box reporter as JMX `MetricsReporter`.

Code Block
languagejava
public interface MetricsReporter {
	/** Configure reporter after instantiating it.*/
     void open();

    /** Closes this reporter. for the currently recorded elements.
     *
     * @return Statistics about the currently recorded elements
     */
    HistogramStatistics getStatistics();

	interface HistogramStatistics {
    	/**
     	* Return the number of values that the statistics computation is based on.
     	*/
    void	int closesize();

    	/** ReportReturns the currentmean measurements.value Thisof methodthe is called periodically by the Metricshistogram observations. */
	void report();
}

Proposed Changes

Architecture

...

    	double mean();

    	/** Returns the standard deviation of the histogram distribution. */
    	double stdDev();

    	/** Returns the maximum value of the histogram observations. */
    	long max();

    	/** Returns the minimum value of the histogram observations. */
    	long min();

    	/**
     	* Returns the percentile value based on the histogram statistics.
     	*
     	* @param percentile percentile point in double. E.g., 0.75 means 75 percentile. It is up to the
     	*     implementation to decide what valid percentile points are supported.
     	* @return Value for the given percentile
     	*/
    	long percentile(double percentile);
  } 
}


Metrics

Class Metrics  is the core of metrics system, there are  MetricGroup  and  MetricsReporter  container in it. When the Metrics  instance is initiating, the MetricGroup  is instantiated and metrics reporters are started. 

Metrics reporters are configurable, users can use custom reporters, Paimon will provide a default metrics reporter of JMX metrics reporter. 

Code Block
languagejava
public class Metrics {

    /** The metrics reporters container. */
  	private final List<MetricsReporter> reporters = new ArrayList<>();

	/** The metrics group. */  
	private final List<MetricGroup> metricGroups = new ArrayList<>();   

	/** Add a metrics reporter. */
	public void addReporter(MetricsReporter reporter) {};

	/** Add a metric group. */
	public void addGroup(MetricGroup group) {};

	/** Get metrics reporters. */
	public List<MetricsReporter> getReporters() {};

	/** Get metric groups. */
	public List<MetricGroup> getMetricGroups() {};
}

MetricGroup

MetricGroup is a class responsible for metrics registering and tagging, there is a metrics container in it. It provides register method for each type of measurable metric, registering metrics will put metrics to the metrics container.

Code Block
languagejava
public interface MetricGroup {
   /** Register gauge metric. */
 	public void gauge(String name, Gauge gauge) {}

	/** Register counter metric. */
	public void counter(String name, Counter counter) {}

	/** Register histogram metric. */
	public void histogram(String name, Histogram counter) {}
}

Paimon support to add tags for metric groups, tags is likely to scopes in Flink's metric group. The tagged metrics group will report metric value with tags as a part of prefix of the metric name. 

Table partition and bucket can be added as tag for some metric groups. Like the number of files monitoring in committing / scan / compaction, users can monitor the files of different buckets to check the data skew, and by checking the compaction duration of different buckets to see which unexpected bucket caused the long compaction duration.

MetricsReporter

MetricsReporter  is used to report metrics to external backend, Paimon will implement an out of box reporter as JMX MetricsReporter .

Code Block
languagejava
public interface MetricsReporter {
	/** Configure reporter after instantiating it.*/
     void open();

    /** Closes this reporter. */
    void close();

	/** Report the current measurements. This method is called periodically by the Metrics. */
	void report();
}

Proposed Changes

Architecture

draw.io Diagram
bordertrue
diagramNameMetricsSystem
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1141
revision6

Metrics Registering

Take CommitMetrics as example, the CommitMetrics will be instantiated by FileStoreCommitImpl, then commit related metrics will be registered by the corresponding MetricGroup in the Metrics instance

The Metrics has instance of MetricGroups and MetricsReporters set. MetricGroup maintains metrics map containers. Metrics registering is a process of putting metrics instances into the metric (gauge, counter) map container. 

Update metrics value

The CommitMetrics values will be updated around commit() operation, for example the commit starting time will be recorded before commit operation, CommitDuration value will be recorded after commit completing.

CompactionMetrics values will be updated around compaction operation, and ScanMetrics will be recorded through the scan planning process.

Report metrics to external backend

Each reporter instance has an timer task fetching metrics from the metrics containers periodically and report them out to the external backends. Paimon will has a default reporter backend with JMX, users can define their own MetricsReporter by implement MetricsReporter  interface. Here we can introduce a core option metrics.reporter  to specify a metrics backend.

Metrics list

We introduce CommitMetrics , CompactionMetrics , ScanMetrics as metrics set to measure the stats of Paimon table committing, compaction and scanning.

Common metrics

CommitMetrics

Code Block
languagejava
public class CommitMetrics {
	private final String LAST_COMMIT_DURATION_METRIC = "lastCommitDuration";
	...

    private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>();
    private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>();
    private final MetricGroup genericMetricGroup;

	private void registerGenericCommitMetrics() {   
		group.gauge(LAST_COMMIT_DURATION_METRIC, new CommitDurationGauge());
		...
	}

 	private void registerTaggedCommitMetrics() {
		if(isNewBucket()) {
			registerTaggedMetrics();
		}

	 	if(isNewBucket()) {
			registerTaggedMetrics();
		}
	}

Metrics Registering

Take CommitMetrics as example, the CommitMetrics will be instantiated by FileStoreCommitImpl, then commit related metrics will be registered by MetricRegistry in singleton Metrics

The Metrics has instance of MetricRegistry and MetricsReporters set. MetricRegistry maintains metrics map containers. Metrics registering is a process of putting metrics instances into the metric (gauge, counter) map container. 

Update metrics value

The CommitMetrics values will be updated around commit() operation, for example the commit starting time will be recorded before commit operation, CommitDuration value will be recorded after commit completing.

CompactionMetrics values will be updated around compaction operation, and ScanMetrics will be recorded through the scan planning process.

Report metrics to external backend

Each reporter instance has an timer task fetching metrics from the metrics containers periodically and report them out to the external backends. Paimon will has a default reporter backend with JMX, users can define their own MetricsReporter by implement MetricsReporter  interface. Here we can introduce a core option metrics.reporter  to specify a metrics backend.

Metrics list

We introduce CommitMetrics , CompactionMetrics , ScanMetrics as metrics set to measure the stats of Paimon table committing, compaction and scanning.

Common metrics

CommitMetrics

Code Block
languagejava
public class CommitMetrics {
	private Metrics metrics;
	private final String COMMIT_DURATION_METRIC = "commitDuration";
	...
	private void registerCommitMetrics(Metrics metrics) {
		metrics.gauge(COMMIT_DURATION_METRIC, new CommitDurationTimer());
		...
	}
	...
}

CommitMetrics list includes commit duration, counter of files / records etc.

Metric Name

Description

Type

Tagged

Unit

Update at

lastCommitDuration

The time it took to complete the last commit.

Gauge

none

Ms

Timer starts before commit starting, update commit duration after commit finished.

lastCommitAttempts

The number of attempts the last commit made.

Counter

Number

Increment by 1 when trying to commit once, clear the counter after duration after the last commit finished.

numTableFilesAddedcommitDuration

Number of added table files in last commit, including newly created data files and compacted after.

Gauge

Number

Collecting changes from committables

numTableFilesDeleted

Number of deleted table files in last commit, which comes from compacted before.

Gauge

Number

Collecting changes from committables

numTableFilesAppended

Number of appended table files in last commit, which means the newly created data files.

Gauge

Number

Collecting changes from committables

Distributions of the time taken by the last few commits.

Histogram

none

Ms

Timer starts before commit starting, update commit duration after each commit finished.

lastCommitAttempts

The number of attempts the last commit made.

Gauge

none

Number

Increment by 1 when trying to commit once, clear the counter after the last commit finished.

lastTableFilesAdded

Number of added

numTableFilesCompated

Number of compacted table files in last commit, including newly created data files and compacted before and after.

Gauge

Number

Collecting changes from committables

numChangelogFilesAppended

Number of appended changelog files in last commit

Gauge

none

NumberNumber

Collecting changes from committables

numChangelogFileCompactedlastTableFilesDeleted

Number of compacted changelog deleted table files in last commit

Gauge

Number

Collecting changes from committables

totalTablesFiles

Number of total data files currently maintained on storage.

, which comes from compacted before.

Gauge

noneCounter

Number

Collecting changes from committables

totalChangelogFileslastTableFilesAppended

Number of total changelog files currently maintained on storage.appended table files in last commit, which means the newly created data files.

Gauge

noneCounter

Number

Collecting changes from committables

numGeneratedSnapshotslastTableFilesCommitCompated

Number of snapshot compacted table files generated in last commit, maybe 1 snapshot or 2 snapshotsincluding compacted before and after.

Gauge

none

Number

Trying to commit

Collecting changes from committables

lastChangelogFilesAppendedtotalSnapshots

Number of currently retained total snapshots.

Counter

Number

When trying to commit, the counter will increment by the number of snapshots generated

When expiring snapshots, the counter will decrement by the number of expiring snapshots.

numTotalRecordsAppended

Total records count in last commit with APPEND commit kind

Gauge

Number

Preparing snapshot file with APPEND commit kind

numDeltaRecordsAppended

Delta records count in last commit with APPEND commit kind

Gauge

Number

Preparing snapshot file with APPEND commit kind

appended changelog files in last commit

Gauge

none

Number

Collecting changes from committables

lastChangelogFileCommitCompacted

Number of compacted changelog files in last commit

Gauge

none

Number

Collecting changes from committables

lastGeneratedSnapshots

Number of snapshot files generated in last commit, maybe 1 snapshot or 2 snapshots.

Gauge

none

Number

After collecting changes from committables

lastDeltaRecordsAppended

Delta

numChangelogRecordsAppended

Changelog records count in last commit with APPEND commit kind

Gauge

none

Number

Preparing snapshot file with APPEND commit kind

numTotalRecordsCompatedlastChangelogRecordsAppended

Total Changelog records count in last commit with COMPACT APPEND commit kind

Gauge

none

Number

Preparing snapshot file with COMPACT APPEND commit kind

numDeltaRecordsCompatedlastDeltaRecordsCommitCompated

Delta records count in last commit with COMPACT commit kind

Gauge

none

Number

Preparing snapshot file with COMPACT commit kind

numChangelogRecordsCompatedlastChangelogRecordsCommitCompated

Changelog records count in last commit with COMPACT commit kind

Gauge

none

Number

Preparing snapshot file with COMPACT commit kind

numPartitionsWrittenlastPartitionsWritten

Number of partitions written in last commit

Gauge

none

Number

Trying to commit

After collecting changes from committables

lastBucketsWrittennumBucketsWritten

Number of buckets written in last commit

Gauge

none

NumberTrying to commit

After collecting changes from committables

ScanMetrics

Code Block
public class ScanMetrics {
	private Metrics metrics;
	private final String SCAN_FILES_METRIC = "scanFiles";
	...
 final String LAST_SCAN_SKIPPED_MANIFESTS_METRIC = "lastScanSkippedManifests";
	...
 	
	private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>();
    private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>();

 	private void registerScanMetricsregisterTaggedScanMetrics(Metrics metrics) {
		if(isNewBucket()) {
		metrics.counter(SCAN_FILES_METRIC, new ScanFilesCounter());
		...
	}
	...	registerTaggedMetrics();
		}

	 	if(isNewPartition()) {
			registerTaggedMetrics();
		}
	}       
}

ScanMetrics list includes duration, data files and manifest files counter.

Metric Name

Description

Type

Unit

Update at

Tagged

Unit

Update at

lastScanDuration

The time it took to complete the last scan planning.

Gauge

none

Ms

Timer starts before the scan planning starts, update after the last scan planning operation finished

scanDuration

Distributions of the time taken by the last few scan planning operations.

Histogram

none

scanDuration

The time it took to complete the scanning.

Gauge

Ms

Timer starts before the scan planning starts, update after each planning finished

numTotalManifestslastScannedManifests

Number of scanned manifests files in the last scan planning.

Gauge

none

NumberPlanning

Scan planning

numSkippedManifestslastScanSkippedManifests

Number of skipped manifests files in the last scan planning.

Gauge

none

NumberPlanning

Scan planning

numResultTableFileslastScanResultTableFiles

Number of result table files in the last scan planning.

Gauge

none

NumberPlanning

Scan planning

CompactionMetrics

Code Block
public class CompactionMetrics { class CompactionMetrics {

	private final String LAST_TABLE_FILES_COMPACTED_BEFORE_METRIC = "lastTableFilesCompactedBefore";
 	...
 	
	private Metrics metrics;
	final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>();
    private final Map<BinaryRow, String COMPACTED_FILES_METRICPartitionMetricGroup> partitionMetricGroups = "compactedFiles";
	...
 new HashMap<>();

 	private void registerCompactionMetricsregisterTaggedCompactionMetrics(Metrics metrics) {
		if(isNewBucket()) {
			metrics.counter(COMPACTED_FILES_METRIC, new CompactedFilesCounterregisterTaggedMetrics();
		}

	 	if(isNewPartition()); {
		...	registerTaggedMetrics();
		}
	...}
}

CompactionMetrics list includes duration, and counter of files, sorted runs etc.

compactionDurationnumFilesCompactedBeforeTriggering numFilesCompactedAfterTriggering numChangelogFilesCompactedTriggering numSortedRunsCounterTriggering compaction

Metric Name

Description

Type

Tagged

Unit

Update at

lastCompactionDuration

The time it took to complete the last compaction.

Gauge

none

Ms

Timer starts before compaction, update after the last compaction finished

compactionDuration

Distributions of the time taken by the last few compactions.

Histogram

none

Ms

Timer starts before compaction, update after each compaction finished

lastTableFilesCompactedBefore

Number of deleted files in last compaction

Gauge

none

Number

After getting compaction result.

lastTableFilesCompactedAfter

Number of added files in last compaction

Gauge

none

Number

After getting compaction result.

lastChangelogFilesCompacted

Number of changelog files compacted in last compaction

Gauge

none

Number

After getting compaction result.

lastCompactOutputLevel

Number of total sorted runs

of all levels at last compaction

Gauge

none

Number

Triggering compaction

numLevel0Files

Number of files at level 0

Counter

Number

Updating levels after getting compaction result.

lastCompactFileSize

Sum of file size compacted in last compaction

Gauge

none

Number

Updating levels after getting compaction result.

Flink connector metrics

Implement important source metrics in FLIP-33.

...