Status
...
Page properties | |
---|---|
|
...
...
...
|
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- A connector implementation does not have to report all the defined metrics. But if a connector reports a metric of the same semantic defined below, the implementation should follow the convention.
- The following metric convention is not a complete list. More conventional metric will be added over time.
- The histogram metrics are usually very expensive. Due to its performance impact, we intentionally excluded them in this FLIP. Please see future work section for more details.
...
Name | Type | Unit | Description |
numBytesIn | Counter | Bytes | The total number of input bytes since the source started. Count towards the numBytesIn in TaskIOMetricsGroup. Predefined. For FLIP-27 Sources, updated by SourceReader implementations via SourceReaderContext. |
numBytesInPerSecond | Meter | Bytes/Sec | The input bytes per second. Predefined. associated with numBytesIn. |
numRecordsIn | Counter | Records | (Existing operator metric) The total number of input records since the source started. The Source implementation should reused this existing operator metric instead of creating a new one. Predefined. For FLIP-27 sources, updated by SourceReader implementations via SourceReaderContext. |
numRecordsInPerSecond | Meter | Records/Sec | (Existing operator metric) The input records per second The Source implementation should reused this existing operator metric instead of creating a new one. Predefined. Associated with numRecordsIn. |
numRecordsInErrors | Counter | Records | The total number of record that failed to consume , process or emit. Predefined. For FLIP-27 sources, updated by SourceReader implementations via SourceReaderContext. |
currentFetchEventTimeLagcurrentFetchLatency | Gauge | ms | The latency occurred before time in milliseconds from the record event timestamp to the timestamp Flink fetched the record. This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency. fetchLatency currentFetchEventTimeLag = FetchTime - EventTime Optional. For FLIP-27 sources, registered and updated by SourceReader implementations via MetricsGroup provided by SourceReaderContext, |
currentEmitEventTimeLagcurrentLatency | Gauge | ms | The latency occurred before time in milliseconds from the record event timestamp to the timestamp the record is emitted by the source connector. This metric is an instantaneous value recorded for the last processed record. This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency. latency currentEmitEventTimeLag = EmitTime - EventTime, where the EmitTime is the time the record leaves the source operator.. Predefined. For FLIP-27 sources, updated by ReaderOutput implementation. |
watermarkLag | Gauge | ms | The time in milliseconds that the watermark lags behind the wall clock time. watermarkLag = CurrentTime - Watermark Predefined. For FLIP-27 sources, updated by ReaderOutput implementation. |
sourceIdleTimeidleTime | Gauge | ms | The time in milliseconds that the source has not processed any record. idleTime sourceIdleTime = CurrentTime - LastRecordProcessTime Optional. For FLIP-27 sources, registered and updated by SourceReader implementation via MetricsGroup provided by SourceReaderContext. |
pendingBytes | Gauge | Bytes | The number of bytes that have not been fetched by the source. e.g. the remaining bytes in a file after the file descriptor reading position. Note that not every source reports this metric, but the metric of the same semantic should be reported with this name and specification if the Source does report. Optional. For FLIP-27 sources, registered and updated by SourceReader implementation via MetricsGroup provided by SourceReaderContext. |
pendingRecords | Gauge | Records | The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition. Note that not every source reports this metric, but the metric of the same semantic should be reported with this name and specification if the Source does report. Optional. For FLIP-27 sources, registered and updated by SourceReader implementation via MetricsGroup provided by SourceReaderContext. |
Sink Metrics
Name | Type | Unit | Description |
numBytesOutnumBytesSend | Counter | Bytes | The total number of output bytes since the source started. Count towards the numBytesOut in TaskIOMetricsGroup Predefined, updated by sink implementations. |
numBytesSendPerSecondnumBytesOutPerSecond | Meter | Bytes/Sec | The output bytes per second Predefined, associated with numBytesSend. |
numRecordsSendnumRecordsOut | Counter | Records | (Existing operator metric) The total number of output records since the source started The Source implementation should reused this existing operator metric instead of creating a new one. sink started Predefined, updated by sink implementations. |
numRecordsSendPerSecondnumRecordsOutPerSecond | Meter | Records/Sec | (Existing operator metric) The output records per second The Source implementation should reused this existing operator metric instead of creating a new one. Predefined, associated with numBytesSend. |
numRecordsSendErrorsnumRecordsOutErrors | Counter | Record | The total number of records failed to send Predefined, updated by sink implementations. |
currentSendTime | Gauge | ms | The time it takes to send the last record. This metric is an instantaneous value recorded for the last processed record. Optional, updated by sink implementations. |
Scope
The metric group for each source and sink would be the same as ordinary operator scope, i.e. default to <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>
...
If the connector has its original metrics, the original metric names should still be kept and reported, even some of the original metrics are exposed with standard metric names. That means the connector will double report such metrics.
API Changes
OperatorIOMetricGroup
Add the following metrics.
Code Block | ||||
---|---|---|---|---|
| ||||
public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> {
private final Counter numBytesIn;
private final Counter numBytesOut;
private final Meter numBytesInRate;
private final Meter numBytesOutRate;
public Counter getNumBytesInCounter() {
return numRecordsIn;
}
public Counter getNumBytesOutCounter() {
return numRecordsOut;
}
public Meter getNumBytesInRateMeter() {
return numRecordsInRate;
}
public Meter getNumBytesOutRate() {
return numRecordsOutRate;
}
} |
SourceReaderContext
To provide the above SourceMetrics in the base implementation of FLIP-27, we add the following methods to SourceReaderContext.
Code Block | ||||
---|---|---|---|---|
| ||||
public interface SourceReaderContext {
Counter getNumBytesInCounter();
Counter getRecordsInCounter();
Counter getNumRecordsInErrorsCounter();
} |
AnchorFutureWork FutureWork
Future Work
FutureWork | |
FutureWork |
...
Name | Type | Unit | Description |
recordSize | Histogram | Bytes | The size of a record. |
fetchLatencyeventTimeFetchLag | Histogram | ms | The latency occurred before Flink fetched the record. fetchLatency = FetchTime - EventTime |
latencyeventTimeLag | Histogram | ms | The latency occurred before the record is emitted by the source connector. latency = EmitTime - EventTime |
sendTime | Histogram | ms | The time it takes to send a record |
...