Discussion thread | https://lists.apache.org/thread/h2lgqgrvcptzj3c8q4cjzv5jopgtwx9o |
---|---|
Vote thread | TBDhttps://lists.apache.org/thread/09lfq4j7w23h6o1lmywl6k9691sqykhm |
ISSUE | https://github.com/apache/incubator-paimon/issues/742 |
Release | TBD0.6 |
Motivation
Currently Paimon runs without monitoring and metrics out of box. In the production environment, users need to know how their Paimon Table behaves like what is the commit duration, how many files each commit added or deleted, the status of compaction operation, the duration of scan query, lag of streaming reading and so on. So we need to introduce a metrics system and more metrics for Paimon.
...
Code Block |
---|
/** A Counter is a metric measured by incrementing and decrementing. */ @Public public interface Counter extends Metric { /** Increment the current count by 1. */ void inc(); /** * Increment the current count by the given value. * * @param n value to increment the current count by */ void inc(long n); /** Decrement the current count by 1. */ void dec(); /** * Decrement the current count by the given value. * * @param n value to decrement the current count by */ void dec(long n); /** * Returns the current count. * * @return current count */ long getCount(); } |
Histogram
Histogram is a type of metric interface measure the statistical distribution of a set of values including the min, max, mean, standard deviation and percentile.
Code Block |
---|
/** The histogram allows to record values, get the current count of recorded values and create
* histogram statistics for the currently seen elements.*/
@Public
public interface Histogram extends Metric {
/**
* Update the histogram with the given value.
*
* @param value Value to update the histogram with
*/
void update(long value);
/**
* Get the count of seen elements.
*
* @return Count of seen elements
*/
long getCount();
/**
* Create statistics for the currently recorded elements.
*
* @return Statistics about the currently recorded elements
*/
HistogramStatistics getStatistics();
interface HistogramStatistics {
/**
* Return the number of values that the statistics computation is based on.
*/
int size();
/** Returns the mean value of the histogram observations. */
double mean();
/** Returns the standard deviation of the histogram distribution. */
double stdDev();
/** Returns the maximum value of the histogram observations. */
long max();
/** Returns the minimum value of the histogram observations. */
long min();
/**
* Returns the percentile value based on the histogram statistics.
*
* @param percentile percentile point in double. E.g., 0.75 means 75 percentile. It is up to the
* implementation to decide what valid percentile points are supported.
* @return Value for the given percentile
*/
long percentile(double percentile);
}
} |
Metrics
Class Metrics
is the core of metrics system, there are `MetricRegistry` and `MetricsReporter` MetricGroup
and MetricsReporter
container in it. When the Metrics
instance is initiating, the MetricRegistry
MetricGroup
is instantiated and metrics reporters are started.
...
Code Block | ||
---|---|---|
| ||
public class Metrics { /** The registrymetrics that holds the metricsreporters container. */ private final MetricRegistry registry List<MetricsReporter> reporters = new ArrayList<>(); /** The metrics reporters containergroup. */ private final List<MetricsReporter> reporters; List<MetricGroup> metricGroups = new ArrayList<>(); /** Add Registera metrics to MetricRegistryreporter. */ @param name The name of metric. @param metric The metric to register. public void addReporter(MetricsReporter reporter) {}; /** Add a metric group. */ public void addGroup(MetricGroup group) {}; /** Get metrics reporters. */ public voidList<MetricsReporter> registerMetricsgetReporters(String name, Metric metric) {}; /** Get metric groups. */ public List<MetricGroup> getMetricGroups() {}; } |
...
MetricGroup
MetricRegistry
MetricGroup
is a class responsible for metrics registering and tagging, there is a metrics container in it. It provides register method for each type of measurable metric, registering metrics will put metrics to the metrics container.
Code Block | ||
---|---|---|
| ||
public classinterface MetricRegistryMetricGroup { /** Map ofRegister gauge metricsmetric. */ public void private final Map<String, Gauge<?>> gauges = new HashMap<>(); /** Map of counter metrics. */ private final Map<String, Counter> counters = new HashMap<>();gauge(String name, Gauge gauge) {} /** Register gaugecounter metric. */ public void gaugecounter(String name, GaugeCounter gaugecounter) {} /** Register counterhistogram metric. */ public void counterhistogram(String name, CounterHistogram counter) {} } |
Paimon support to add tags for metric groups, tags is likely to scopes
in Flink's metric group. The tagged metrics group will report metric value with tags as a part of prefix of the metric name.
Table partition and bucket can be added as tag for some metric groups. Like the number of files monitoring in committing / scan / compaction, users can monitor the files of different buckets to check the data skew, and by checking the compaction duration of different buckets to see which unexpected bucket caused the long compaction duration.
MetricsReporter
MetricsReporter
is used to report metrics to external backend, Paimon will implement an out of box reporter as JMX `MetricsReporter` MetricsReporter
.
Code Block | ||
---|---|---|
| ||
public interface MetricsReporter { /** Configure reporter after instantiating it.*/ void open(); /** Closes this reporter. */ void close(); /** Report the current measurements. This method is called periodically by the Metrics. */ void report(); } |
...
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Metrics Registering
Take CommitMetrics
as example, the CommitMetrics
will be instantiated by FileStoreCommitImpl
, then commit related metrics will be registered by MetricRegistry the corresponding MetricGroup in singleton the
Metrics instance
.
The
Metrics
has instance of MetricRegistry
MetricGroups
and MetricsReporters
set. MetricRegistry
MetricGroup
maintains metrics map containers. Metrics registering is a process of putting metrics instances into the metric (gauge, counter) map container.
...
Code Block | ||
---|---|---|
| ||
public class CommitMetrics { private Metrics metrics; private final String LAST_COMMIT_DURATION_METRIC = "commitDurationlastCommitDuration"; ... private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>(); private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>(); private final MetricGroup genericMetricGroup; private void registerCommitMetricsregisterGenericCommitMetrics(Metrics metrics) { metricsgroup.gauge(LAST_COMMIT_DURATION_METRIC, new CommitDurationTimerCommitDurationGauge()); ... } ... private void registerTaggedCommitMetrics() { if(isNewBucket()) { registerTaggedMetrics(); } if(isNewBucket()) { registerTaggedMetrics(); } } } |
CommitMetrics
list includes commit duration, counter of files / records etc.
Metric Name | Description | Type | Tagged | Unit | Update at | ||
lastCommitDuration | The time it took to complete the last commit. | Gauge | none | Ms | Timer starts before commit starting, update commit duration after the last commit finished. | ||
commitDuration | Distributions of the time taken by the last few commits. | Histogram | none | Ms | Timer starts before commit starting, update commit duration after each commit finished. | ||
lastCommitAttempts | The number of attempts the last commit made. | Gauge | none | Number | Increment by 1 when trying to commit once, clear the counter after the last commit finished. | ||
numTableFilesAddedlastTableFilesAdded | Number of added table files in last commit, including newly created data files and compacted after. | Gauge | none | Number | Collecting changes from committables | ||
numTableFilesDeletedlastTableFilesDeleted | Number of deleted table files in last commit, which comes from compacted before. | Gauge | none | Number | Collecting changes from committables | ||
numTableFilesAppendedlastTableFilesAppended | Number of appended table files in last commit, which means the newly created data files. | Gauge | none | Number | Collecting changes from committables | ||
numTableFilesCompatedlastTableFilesCommitCompated | Number of compacted table files in last commit, including compacted before and after. | Gauge | none | Number | Collecting changes from committables | ||
numChangelogFilesAppendedlastChangelogFilesAppended | Number of appended changelog files in last commit | Gauge | none | Number | Collecting changes from committables | ||
numChangelogFileCompactedlastChangelogFileCommitCompacted | Number of compacted changelog files in last commit | Gauge | none | Number | Collecting changes from committables | ||
numSnapshotslastGeneratedSnapshots | Number of snapshot files generated in last commit, maybe 1 snapshot or 2 snapshots. | Gauge | none | Number | Trying to commit | After collecting changes from committables | |
lastDeltaRecordsAppended | Delta records count in last commit with APPEND commit kind | Gauge | none | Number | Preparing snapshot file with APPEND commit kind | ||
lastChangelogRecordsAppended | Changelog | numTotalRecords | Total records count in last commit with APPEND commit kind | Gauge | none | Number | Preparing snapshot file with APPEND commit kind |
numDeltaRecordslastDeltaRecordsCommitCompated | Delta records count in last commit with COMPACT commit kind | Gauge | none | Number | Preparing snapshot file with COMPACT commit kind | ||
numChangelogRecordslastChangelogRecordsCommitCompated | Changelog records count in last commit with COMPACT commit kind | Gauge | none | Number | Preparing snapshot file with COMPACT commit kind | ||
numPartitionsWrittenlastPartitionsWritten | Number of partitions written in last commit | Gauge | none | Number | Trying to commit | After collecting changes from committables | |
lastBucketsWrittennumBucketsWritten | Number of buckets written in last commit | Gauge | none | NumberTrying to commit | After collecting changes from committables |
ScanMetrics
Code Block |
---|
public class ScanMetrics { private Metrics metrics; private final String LAST_SCAN_SKIPPED_FILESMANIFESTS_METRIC = "scanFileslastScanSkippedManifests"; ... private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>(); private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>(); private void registerScanMetricsregisterTaggedScanMetrics(Metrics metrics) { if(isNewBucket()) { metrics.counter(SCAN_FILES_METRIC, new ScanFilesCounterregisterTaggedMetrics(); } if(isNewPartition()); { ... registerTaggedMetrics(); } ...} } |
ScanMetrics
list includes duration, data files and manifest files counter.
Metric Name | Description | Type | Tagged | Unit | Update at |
scanDurationlastScanDuration | The time it took to complete the scanninglast scan planning. | Gauge | none | Ms | Timer starts before the scan planning starts, update after the last scan planning operation finished |
scanDuration | Distributions of the time taken by the last few scan planning operations. | Histogram | none | Ms | Timer starts before the scan planning starts, update after each planning finished |
lastScannedManifestsnumTotalManifests | Number of scanned manifests files in the last scan planning. | Gauge | none | NumberPlanning | Scan planning |
numSkippedManifestslastScanSkippedManifests | Number of skipped manifests files in the last scan planning. | Gauge | none | NumberPlanning | Scan planning |
numResultTableFileslastScanResultTableFiles | Number of result table files in the last scan planning. | Gauge | none | NumberPlanning | Scan planning |
CompactionMetrics
Code Block |
---|
public class CompactionMetrics { private Metrics metrics; private final String LAST_TABLE_FILES_COMPACTED_FILESBEFORE_METRIC = "compactedFileslastTableFilesCompactedBefore"; ... private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>(); private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>(); private void registerCompactionMetricsregisterTaggedCompactionMetrics(Metrics metrics) { if(isNewBucket()) { metrics.counter(COMPACTED_FILES_METRIC, new CompactedFilesCounterregisterTaggedMetrics(); } if(isNewPartition()); { ... registerTaggedMetrics(); } ...} } |
CompactionMetrics
list includes duration, and counter of files, sorted runs etc.
Metric Name | Description | Type | Tagged | Unit | Update at | compactionDuration||||||
lastCompactionDuration | The time it took to complete the last compaction. | Gauge | none | Ms | Timer starts before compaction, update after the last compaction finished | ||||||
compactionDuration | Distributions of the time taken by the last few compactions. | Histogram | none | Ms | Timer starts before compaction, update after each compaction finished | ||||||
lastTableFilesCompactedBefore | Number of deleted files in last compaction | Gauge | none | Number | Triggering After getting compaction result. | ||||||
lastTableFilesCompactedAfter | Number of added files in last compaction | Gauge | none | Number | Triggering After getting compaction result. | ||||||
lastChangelogFilesCompacted | Number of changelog files compacted in last compaction | Gauge | none | Number | Triggering After getting compaction result. | numSortedRuns||||||
lastCompactOutputLevel | Number of total sorted runs | Counterof all levels at last compaction | Gauge | none | Number | Triggering compaction | numLevel0Files | Number of files at level 0 | Counter | Number | Triggering compactionUpdating levels after getting compaction result. |
lastCompactFileSize | Sum of file size compacted in last compaction | Gauge | none | Number | Updating levels after getting compaction result. |
Flink connector metrics
Implement important source metrics in FLIP-33.
...