Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

...

Page properties


Discussion thread

...

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-11576

...

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Flink has defined a few standard metrics for jobs, tasks and operators. It also supports custom metrics in various scenarios. However, so far there is no standard or conventional metric definition for the connectors. Each connector defines their own metrics at the moment. This complicates operation and monitoring. Admittedly, different connectors may have different metrics, but some commonly used metrics can probably be standardized. This FLIP proposes a set of standard connector metrics that each connector should emit if applicable. The metrics proposed in this FLIP will serve as a convention for the connector implementations

In the future, the other projects in Flink ecosystem may rely on this metric convention. Therefore, the connector implementations are expected to follow the conventions when reporting the metrics.

Public Interfaces

We propose to introduce a set of conventional / standard metrics for the connectors.

It is important to mention that

  • A connector implementation does not have to report all the defined metrics. But if a connector reports a metric of the same semantic defined below, the implementation should follow the convention.
  • The following metric convention is not a complete list. More conventional metric will be added over time.
  • The histogram metrics are usually very expensive. Due to its performance impact, we intentionally excluded them in this FLIP. Please see future work section for more details.

Source Metrics

Name

Type

Unit

Description

numBytesIn

Counter

Bytes

The total number of input bytes since the source started

numBytesInPerSec

. Count towards the numBytesIn in TaskIOMetricsGroup.

Predefined. For FLIP-27 Sources, updated by SourceReader implementations via SourceReaderContext.

numBytesInPerSecond

Meter

Bytes/Sec

The input bytes per second.

Predefined. associated with numBytesIn.

numRecordsIn

Counter

Records

(Existing operator metric) The total number of input records since the source started

numRecordsInPerSec

.

The Source implementation should reused this existing operator metric instead of creating a new one.

Predefined. For FLIP-27 sources, updated by SourceReader implementations via SourceReaderContext.

numRecordsInPerSecond

Meter

Records/Sec

(Existing operator metric) The input records per second

The Source implementation should reused this existing operator metric instead of creating a new one.

Predefined. Associated with numRecordsIn.

numRecordsInErrorsCounterRecords

The total number of record that failed to consume

recordSize

Histogram

Bytes

The size of a record

fetchLatency

Histogram

ms

The latency occurred before Flink fetched the record.

fetchLatency = FetchTime - EventTime

, process or emit.

Predefined. For FLIP-27 sources, updated by SourceReader implementations via SourceReaderContext.

currentFetchEventTimeLagGaugems

The time in milliseconds from the record event timestamp to the timestamp Flink fetched the record.

This metric is an instantaneous value recorded for the last processed record.

This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency.

currentFetchEventTimeLag = FetchTime - EventTime

Optional. For FLIP-27 sources, registered and updated by SourceReader implementations via MetricsGroup provided by SourceReaderContext,

currentEmitEventTimeLagGaugems

The time in milliseconds from the record event timestamp to the timestamp

latency

Histogram

ms

The latency occurred before

the record is emitted by the source connector.

This metric is an instantaneous value recorded for the last processed record.

This metric is provided because latency histogram could be expensive. The instantaneous latency value is usually a good enough indication of the latency.

currentEmitEventTimeLag = EmitTime - EventTime

idleTime

, where the EmitTime is the time the record leaves the source operator.

Predefined. For FLIP-27 sources, updated by ReaderOutput implementation.

watermarkLagGaugems

The time in milliseconds that the watermark lags behind the wall clock time.

watermarkLag = CurrentTime - Watermark

Predefined. For FLIP-27 sources, updated by ReaderOutput implementation.

sourceIdleTime

Gauge

ms

The time in milliseconds that the source has not processed any record.

idleTime

sourceIdleTime = CurrentTime - LastRecordProcessTime

Optional. For FLIP-27 sources, registered and updated by SourceReader implementation via MetricsGroup provided by SourceReaderContext.

pendingBytesGaugeBytes

The number of bytes that have not been fetched by the source. e.g. the remaining bytes in a file after the file descriptor reading position.

Note that not every source reports this metric, but the metric of the same semantic should be reported with this name and specification if the Source does report.

Optional. For FLIP-27 sources, registered and updated by SourceReader implementation via MetricsGroup provided by SourceReaderContext.

pendingRecordsGaugeRecords

The number of records that have not been fetched by the source. e.g. the available records after the consumer offset in a Kafka partition.

Note that not every source reports this metric, but the metric of the same semantic should be reported with this name and specification if the Source does report.

Optional. For FLIP-27 sources, registered and updated by SourceReader implementation via MetricsGroup provided by SourceReaderContext.

Sink Metrics

Name

Type

Unit

Description

numBytesOut

numBytesSend

Counter

Bytes

The total number of output bytes since the source started

numBytesOutPerSec

. Count towards the numBytesOut in TaskIOMetricsGroup

Predefined, updated by sink implementations.

numBytesSendPerSecond

Meter

Bytes/Sec

The output bytes per second

numRecordsOut

Predefined, associated with numBytesSend.

numRecordsSend

Counter

Records

The total number of output records since the

source startednumRecordsOutPerSec

sink started

Predefined, updated by sink implementations.

numRecordsSendPerSecond

Meter

Records/Sec

The output records per second

numRecordsOutErrors

Predefined, associated with numBytesSend.

numRecordsSendErrorsCounterRecord

The total number of records failed to send

recordSize

Histogram

Bytes

The size of a record

sendTime

Predefined, updated by sink implementations.

currentSendTimeGauge
Histogram
ms

The time it takes to send

a

the last record

Note:

...

.

This metric is an instantaneous value recorded for the last processed record.

Optional, updated by sink implementations

...

.

Scope

The metric group for each source and sink would be the same as ordinary operator scope, i.e. default to <host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>

...

If the connector has its original metrics, the original metric names should still be kept and reported, even some of the original metrics are exposed with standard metric names. That means the connector will double report such metrics.

API Changes

OperatorIOMetricGroup

Add the following metrics.

Code Block
languagejava
titleOperatorIOMetricsGroup
public class OperatorIOMetricGroup extends ProxyMetricGroup<OperatorMetricGroup> {
	private final Counter numBytesIn;
	private final Counter numBytesOut;

	private final Meter numBytesInRate;
	private final Meter numBytesOutRate;

	public Counter getNumBytesInCounter() {
		return numRecordsIn;
	}

	public Counter getNumBytesOutCounter() {
		return numRecordsOut;
	}

	public Meter getNumBytesInRateMeter() {
		return numRecordsInRate;
	}

	public Meter getNumBytesOutRate() {
		return numRecordsOutRate;
	}
}

SourceReaderContext

To provide the above SourceMetrics in the base implementation of FLIP-27, we add the following methods to SourceReaderContext.

Code Block
languagejava
titleSourceReaderContext
public interface SourceReaderContext {

	Counter getNumBytesInCounter();

	Counter getRecordsInCounter();

	Counter getNumRecordsInErrorsCounter();
}

Anchor
FutureWork
FutureWork
Future Work

Opt in/out metrics

In this FLIP, we intentionally left some of the useful but potentially expensive metrics out of the scope. For example:

Name

Type

Unit

Description

recordSize

Histogram

Bytes

The size of a record.

eventTimeFetchLag

Histogram

ms

The latency occurred before Flink fetched the record.

fetchLatency = FetchTime - EventTime

eventTimeLag

Histogram

ms

The latency occurred before the record is emitted by the source connector.

latency = EmitTime - EventTime

sendTime

Histogram

ms

The time it takes to send a record

We plan to add these metrics to the convention by introducing optional metrics to allow user opt in/out these expensive metrics on demand. This will be discussed in a separate FLIP.

Proposed Changes

  1. Add the proposed metrics to the existing connectors.
  2. Mark the old metrics as deprecated if necessary.
  3. Correct the scope and metric names of the connectors if needed.

...