Discussion thread | https://lists.apache.org/thread/h2lgqgrvcptzj3c8q4cjzv5jopgtwx9o |
---|---|
Vote thread | TBDhttps://lists.apache.org/thread/09lfq4j7w23h6o1lmywl6k9691sqykhm |
ISSUE | https://github.com/apache/incubator-paimon/issues/742 |
Release | TBD0.6 |
Motivation
Currently Paimon runs without monitoring and metrics out of box. In the production environment, users need to know how their Paimon Table behaves like what is the commit duration, how many files each commit added or deleted, the status of compaction operation, the duration of scan query, lag of streaming reading and so on. So we need to introduce a metrics system and more metrics for Paimon.
...
Code Block | ||
---|---|---|
| ||
public class Metrics { /** The metrics reporters container. */ private final List<MetricsReporter> reporters = new ArrayList<>(); /** The metrics group. */ private final Map<String, MetricGroup>List<MetricGroup> metricGroups = new HashMap<>ArrayList<>(); /** Add a metrics reporter. */ public void addReporter(MetricsReporter reporter) {}; /** Add a metric group. */ public void addGroup(String groupName, MetricGroup group) {}; /** Get metrics reporters. */ public List<MetricsReporter> getReporters() {}; /** Get metric groups. */ public Map<String, MetricGroup>List<MetricGroup> getMetricGroups() {}; } |
MetricGroup
...
Code Block | ||
---|---|---|
| ||
public classinterface MetricGroup { private final String groupName;/** Register gauge metric. */ public void gauge(String name, Gauge gauge) {} /** ConstructorRegister ofcounter MetricGroupmetric. */ public void MetricGroupcounter(String name, Counter groupNamecounter) {} this.groupName = groupName; getMetricsInstance().addGroup(groupName, this); } /** tags of metric group. */ private final Map<String, Set<String>> tags = new HashMap<>(); /** Map of gauge metrics. */ private final Map<String, Gauge<?>> gauges = new HashMap<>(); /** Map of counter metrics. */ private final Map<String, Counter> counters = new HashMap<>(); /** Map of counter metrics. */ private final Map<String, Histogram> histograms = new HashMap<>(); /** Register gauge metric. */ public void gauge(String name, Gauge gauge) {} /** Register counter metric. */ public void counter(String name, Counter counter) {} /** Register histogram metric. */ public void histogram(String name, Histogram counter) {} /** Add tag for metric group. */ public void addTag(String tag, String value) {} } |
Paimon support to add tags for metric groups, the tagged metrics group will report metric value with tags. Table partition and bucket can be added as tag for some metric groups. Like the number of files monitoring in committing / scan / compaction, users can monitor the files of different buckets to check the data skew, and by checking the commit duration of different buckets to see which unexpected bucket caused the long commit duration.
MetricsReporter
MetricsReporter
is used to report metrics to external backend, Paimon will implement an out of box reporter as JMX `MetricsReporter`.
Code Block | ||
---|---|---|
| ||
public interface MetricsReporter {
/** Configure reporter after instantiating it.*/
void open();
/** Closes this reporter. */
void close();
/** Report the current measurements. This method is called periodically by the Metrics. */
void report();
} |
Proposed Changes
Architecture
...
/** Register histogram metric. */
public void histogram(String name, Histogram counter) {}
} |
Paimon support to add tags for metric groups, tags is likely to scopes
in Flink's metric group. The tagged metrics group will report metric value with tags as a part of prefix of the metric name.
Table partition and bucket can be added as tag for some metric groups. Like the number of files monitoring in committing / scan / compaction, users can monitor the files of different buckets to check the data skew, and by checking the compaction duration of different buckets to see which unexpected bucket caused the long compaction duration.
MetricsReporter
MetricsReporter
is used to report metrics to external backend, Paimon will implement an out of box reporter as JMX MetricsReporter
.
Code Block | ||
---|---|---|
| ||
public interface MetricsReporter {
/** Configure reporter after instantiating it.*/
void open();
/** Closes this reporter. */
void close();
/** Report the current measurements. This method is called periodically by the Metrics. */
void report();
} |
Proposed Changes
Architecture
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Metrics Registering
Take CommitMetrics
as example, the CommitMetrics
will be instantiated by FileStoreCommitImpl
, then commit related metrics will be registered by the corresponding MetricGroup in the
Metrics instance
.
The
Metrics
has instance of MetricGroups
and MetricsReporters
set. MetricGroup
maintains metrics map containers. Metrics registering is a process of putting metrics instances into the metric (gauge, counter) map container.
Update metrics value
The CommitMetrics
values will be updated around commit()
operation, for example the commit starting time will be recorded before commit operation, CommitDuration value will be recorded after commit completing.
CompactionMetrics
values will be updated around compaction operation, and ScanMetrics
will be recorded through the scan planning process.
Report metrics to external backend
Each reporter instance has an timer task fetching metrics from the metrics containers periodically and report them out to the external backends. Paimon will has a default reporter backend with JMX, users can define their own MetricsReporter
by implement MetricsReporter
interface. Here we can introduce a core option metrics.reporter
to specify a metrics backend.
Metrics list
We introduce CommitMetrics
, CompactionMetrics
, ScanMetrics
as metrics set to measure the stats of Paimon table committing, compaction and scanning.
Common metrics
CommitMetrics
Code Block | ||
---|---|---|
| ||
public class CommitMetrics {
private final String LAST_COMMIT_DURATION_METRIC = "lastCommitDuration";
...
private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>();
private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new HashMap<>();
private final MetricGroup genericMetricGroup;
private void registerGenericCommitMetrics() {
group.gauge(LAST_COMMIT_DURATION_METRIC, new CommitDurationGauge());
...
}
private void registerTaggedCommitMetrics() {
if(isNewBucket()) {
registerTaggedMetrics();
}
if(isNewBucket()) {
registerTaggedMetrics();
}
}
} |
CommitMetrics
list includes commit duration, counter of files / records etc.
Metric Name | Description | Type | Tagged | Unit | Update at |
lastCommitDuration | The time it took to complete the last commit. | Gauge | none | Ms | Timer starts before commit starting, update commit duration after the last commit finished. |
commitDuration | Distributions of the time taken by the last few commits. | Histogram | none | Ms | Timer starts before commit starting, update commit duration after each commit finished. |
lastCommitAttempts | The number of attempts the last commit made. | Gauge | none | Number | Increment by 1 when trying to commit once, clear the counter after the last commit finished. |
lastTableFilesAdded | Number of added table files in last commit, including newly created data files and compacted after. | Gauge | none | Number | Collecting changes from committables |
lastTableFilesDeleted | Number of deleted table files in last commit, which comes from compacted before. | Gauge | none | Number | Collecting changes from committables |
lastTableFilesAppended | Number of appended table files in last commit, which means the newly created data files. | Gauge | none | Number | Collecting changes from committables |
lastTableFilesCommitCompated | Number of compacted table files in last commit, including compacted before and after. | Gauge | none | Number | Collecting changes from committables |
lastChangelogFilesAppended | Number of appended changelog files in last commit | Gauge | none | Number | Collecting changes from committables |
lastChangelogFileCommitCompacted | Number of compacted changelog files in last commit | Gauge | none | Number | Collecting changes from committables |
lastGeneratedSnapshots | Number of snapshot files generated in last commit, maybe 1 snapshot or 2 snapshots. | Gauge | none | Number | After collecting changes from committables |
lastDeltaRecordsAppended | Delta |
Metrics Registering
Take CommitMetrics
as example, the CommitMetrics
will be instantiated by FileStoreCommitImpl
, then commit related metrics will be registered by the corresponding MetricGroup in the
Metrics instance
.
The
Metrics
has instance of MetricGroups
and MetricsReporters
set. MetricGroup
maintains metrics map containers. Metrics registering is a process of putting metrics instances into the metric (gauge, counter) map container.
Update metrics value
The CommitMetrics
values will be updated around commit()
operation, for example the commit starting time will be recorded before commit operation, CommitDuration value will be recorded after commit completing.
CompactionMetrics
values will be updated around compaction operation, and ScanMetrics
will be recorded through the scan planning process.
Report metrics to external backend
Each reporter instance has an timer task fetching metrics from the metrics containers periodically and report them out to the external backends. Paimon will has a default reporter backend with JMX, users can define their own MetricsReporter
by implement MetricsReporter
interface. Here we can introduce a core option metrics.reporter
to specify a metrics backend.
Metrics list
We introduce CommitMetrics
, CompactionMetrics
, ScanMetrics
as metrics set to measure the stats of Paimon table committing, compaction and scanning.
Common metrics
CommitMetrics
Code Block | ||
---|---|---|
| ||
public class CommitMetrics {
private final String GROUP_NAME = "commitMetricGroup";
private final String LAST_COMMIT_DURATION_METRIC = "lastCommitDuration";
...
private MetricGroup group = new MetricsGroup(GROUP_NAME);
private void registerCommitMetrics() {
group.gauge(LAST_COMMIT_DURATION_METRIC, new CommitDurationGauge());
...
}
} |
CommitMetrics
list includes commit duration, counter of files / records etc.
Metric Name | Description | Type | Tagged | Unit | Update at |
lastCommitDuration | The time it took to complete the last commit. | Gauge | Partition, Bucket | Ms | Timer starts before commit starting, update commit duration after the last commit finished. |
commitDuration | Distributions of the time taken by the last few commits. | Histogram | Partition, Bucket | Ms | Timer starts before commit starting, update commit duration after each commit finished. |
lastCommitAttempts | The number of attempts the last commit made. | Counter | Partition, Bucket | Number | Increment by 1 when trying to commit once, clear the counter after the last commit finished. |
lastTableFilesAdded | Number of added table files in last commit, including newly created data files and compacted after. | Gauge | Partition, Bucket | Number | Collecting changes from committables |
lastTableFilesDeleted | Number of deleted table files in last commit, which comes from compacted before. | Gauge | Partition, Bucket | Number | Collecting changes from committables |
lastTableFilesAppended | Number of appended table files in last commit, which means the newly created data files. | Gauge | Partition, Bucket | Number | Collecting changes from committables |
lastTableFilesCommitCompated | Number of compacted table files in last commit, including compacted before and after. | Gauge | Partition, Bucket | Number | Collecting changes from committables |
lastChangelogFilesAppended | Number of appended changelog files in last commit | Gauge | Partition, Bucket | Number | Collecting changes from committables |
lastChangelogFileCommitCompacted | Number of compacted changelog files in last commit | Gauge | Partition, Bucket | Number | Collecting changes from committables |
totalTablesFiles | Number of total data files currently maintained on storage. | Counter | Partition, Bucket | Number | Collecting changes from committables |
totalChangelogFiles | Number of total changelog files currently maintained on storage. | Counter | Partition, Bucket | Number | Collecting changes from committables |
lastGeneratedSnapshots | Number of snapshot files generated in last commit, maybe 1 snapshot or 2 snapshots. | Gauge | Partition, Bucket | Number | After collecting changes from committables |
totalSnapshots | Number of currently retained total snapshots. | Counter | none | Number | When trying to commit, the counter will increment by the number of snapshots generated When expiring snapshots, the counter will decrement by the number of expiring snapshots. |
lastTotalRecordsAppended | Total records count in last commit with APPEND commit kind | Gauge | none | Number | Preparing snapshot file with APPEND commit kind |
lastDeltaRecordsAppended | Delta records count in last commit with APPEND commit kind | Gauge | Partition, Bucket | Number | Preparing snapshot file with APPEND commit kind |
lastChangelogRecordsAppended | Changelog records count in last commit with APPEND commit kind | Gauge | Partition, Bucketnone | Number | Preparing snapshot file with APPEND commit kind |
lastTotalRecordsCommitCompatedlastChangelogRecordsAppended | Total Changelog records count in last commit with COMPACT APPEND commit kind | Gauge | Partition, Bucketnone | Number | Preparing snapshot file with COMPACT APPEND commit kind |
lastDeltaRecordsCommitCompated | Delta records count in last commit with COMPACT commit kind | Gauge | Partition, Bucketnone | Number | Preparing snapshot file with COMPACT commit kind |
lastChangelogRecordsCommitCompated | Changelog records count in last commit with COMPACT commit kind | Gauge | Partition, Bucketnone | Number | Preparing snapshot file with COMPACT commit kind |
lastPartitionsWritten | Number of partitions written in last commit | Gauge | none | Number | After collecting changes from committables |
lastBucketsWritten | Number of buckets written in last commit | Gauge | none | Number | After collecting changes from committables |
...
Code Block |
---|
public class ScanMetrics { private final String GROUP_NAME = "scanMetricGroup"; private final String LAST_SCAN_SKIPPED_MANIFESTS_METRIC = "lastScanSkippedManifests"; ... private MetricGroup group... private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>(); private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new MetricsGroupHashMap<>(GROUP_NAME); private void registerScanMetricsregisterTaggedScanMetrics() { group.gauge(LAST_SCAN_SKIPPED_MANIFESTS_METRIC, new LastScanSkippedManifestsGauge() if(isNewBucket()) { registerTaggedMetrics(); } if(isNewPartition()) { registerTaggedMetrics(); ...} } } |
ScanMetrics
list includes duration, data files and manifest files counter.
Metric Name | Description | Type | Tagged | Unit | Update at | ||||||
lastScanDuration | The time it took to complete the last scan planning. | Gauge | Partition, Bucketnone | Ms | Timer starts before the scan planning starts, update after the last scan planning operation finished | ||||||
scanDuration | Distributions of the time taken by the last few scan planning operations. | Histogram | Partition, Bucketnone | Ms | Timer starts before the scan planning starts, update after each planning finished | ||||||
lastScannedManifests | Number of scanned manifests files in the last scan planning. | Gauge | Partition, Bucketnone | Number | Scan planning | ||||||
lastScanSkippedManifests | Number of skipped manifests files in the last scan planning. | Gauge | Partition, Bucket | Number | Scan planning | lastScanResultTableFiles | Number of result table manifests files in the last scan planning. | Gauge | Partition, Bucketnone | Number | Scan planning |
lastScanGenerateSplitslastScanResultTableFiles | Number of splits generated by result table files in the last scan planning. | Gauge | Partition, Bucketnone | Number | Scan planning |
CompactionMetrics
Code Block |
---|
public class CompactionMetrics { private final String GROUP_NAME = "compactionMetricGroup"; private final String LAST_TABLE_FILES_COMPACTED_BEFORE_METRIC = "lastTableFilesCompactedBefore"; .... private MetricGroup group private final Map<Integer, BucketMetricGroup> bucketMetricGroups = new HashMap<>(); private final Map<BinaryRow, PartitionMetricGroup> partitionMetricGroups = new MetricsGroupHashMap<>(GROUP_NAME); private void registerScanMetricsregisterTaggedCompactionMetrics() { group.gauge(LAST_TABLE_FILES_COMPACTED_BEFORE_METRIC, new LastTableFilesCompactedBeforeGauge() if(isNewBucket()) { registerTaggedMetrics(); } if(isNewPartition()) { registerTaggedMetrics(); ...} } } |
CompactionMetrics
list includes duration, and counter of files, sorted runs etc.
Metric Name | Description | Type | Tagged | Unit | Update at |
lastCompactionDuration | The time it took to complete the last compaction. | Gauge | Partition, Bucketnone | Ms | Timer starts before compaction, update after the last compaction finished |
compactionDuration | Distributions of the time taken by the last few compactions. | Histogram | Partition, Bucketnone | Ms | Timer starts before compaction, update after each compaction finished |
lastTableFilesCompactedBefore | Number of deleted files in last compaction | Gauge | Partition, Bucketnone | Number | After getting compaction result. |
lastTableFilesCompactedAfter | Number of added files in last compaction | Gauge | Partition, Bucketnone | Number | After getting compaction result. |
lastChangelogFilesCompacted | Number of changelog files compacted in last compaction | Gauge | Partition, Bucketnone | Number | After getting compaction result. |
lastSortedRunslastCompactOutputLevel | Number of total sorted runs of all levels at last compaction | Gauge | Partition, Bucketnone | Number | Updating levels after getting compaction result. |
lastLevel0FileslastCompactFileSize | Number of files at level 0 at Sum of file size compacted in last compaction | Gauge | Partition, Bucketnone | Number | Updating levels after getting compaction result. |
...