Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

FLIP-386 is building on top of FLIP-384. The intention here is to add a capability for state backends to attach custom attributes during recovery to recovery spans. For example RocksDBIncrementalRestoreOperation could report both remote download time and time to actually clip/ingest the RocksDB instances after rescaling.

...

Proposed Changes

This flip proposes to add OpenTelemetryMetricReporterFactory and OpenTelemetryTraceReporterFactory factories, that would be both available in the newly added plugin flink-metrics/flink-metrics-otel (similar structure as pre-existing MetricReporters like flink-metrics-jmx).

The only custom configuration will be:

  • exporter.endpoint - url of the OpenTelemetry endpoint

  • exporter.timeout - timeout when reporting to the endpoint (Default value

Both metrics and traces reporters will also support scope.variables.additional

Example configuration:

Code Block
metrics.reporters: otel
metrics.reporter.otel.factory.class: org.apache.flink.common.metrics.OpenTelemetryMetricReporterFactory
metrics.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
metrics.reporter.otel.scope.variables.additional: region:eu-west-1,environment:local-pnowojski-test,flink_runtime:1.17.1

traces.reporters: otel
traces.reporter.otel.factory.class: org.apache.flink.common.metrics.OpenTelemetryTraceReporterFactory
traces.reporter.otel.exporter.endpoint: http://127.0.0.1:1337
traces.reporter.otel.scope.variables.additional: region:eu-west-1,environment:local-pnowojski-test,flink_runtime:1.17.1

Compatibility, Deprecation, and Migration Plan

There will be no impact on existing users, there is no need for any migration.

Test Plan

Recovery metrics are first collected for each subtask and then sent back to the JobManager for aggregation, as described in FLIP-384. From all of those aggregated metrics, a single recovery span trace is created on the JobManager. Thus this FLIP must define not only API to report a custom attribute, but also how to aggregate it later.

Public Interfaces

It will be possible to set the custom attributes via the following interfaces:

Code Block
@Experimental
public interface CustomMetric {
    enum AggregationMethod {
        MIN,
        MAX,
        SUM,
        AVG
    }

    CustomMetric addAggregationMethod(AggregationMethod aggregationMethod);
}

@Experimental
public interface CustomInitializationMetrics {
    CustomMetric addMetric(String name, long value);
    CustomMetric addMetric(String name, double value);
}

// intendend usage:
customInitializationMetrics.addMetric("DownloadTime", value)
    .addAggregationMethod(AggregationMethod.MAX)
    .addAggregationMethod(AggregationMethod.SUM)

The remaining issue is to pass CustomInitializationMetrics instance to for example RocksDBIncrementalRestoreOperation . In this FLIP I’m proposing to do it via adding the following interfaces to the org.apache.flink.runtime.state.StateBackend interface:

Code Block
@PublicEvolving
public interface StateBackend {
  (...)
  
  @PublicEvolving
  pubic interface KeyedStateBackendParameters<K> {
    // already existing parameters of createKeyedStateBackend() method
    Environment getEnv();
    JobID getJobID();
    String getOperatorIdentifier();
    TypeSerializer<K> getKeySerializer();
    int getNumberOfKeyGroups();
    KeyGroupRange getKeyGroupRange();
    TaskKvStateRegistry getKvStateRegistry();
    TtlTimeProvider getTtlTimeProvider();
    @Nonnull Collection<KeyedStateHandle> getStateHandles();
    CloseableRegistry getCancelStreamRegistry();
    double getManagedMemoryFraction();
    
    // newly added parameter
    CustomInitializationMetrics getCustomInitializationMetrics();
  }
  
  @PublicEvolving
  public interface OperatorStateBackendParameters {
    // already existing parameters of createOperatorStateBackend() method
    Environment getEnv();
    String getOperatorIdentifier();
    @Nonnull Collection<OperatorStateHandle> getStateHandles();
    CloseableRegistry getCancelStreamRegistry();
    
    // newly added parameter
    CustomInitializationMetrics getCustomInitializationMetrics();
  }
}

With the following new methods:

Code Block
@PublicEvolving
public interface StateBackend {
  (...)

  <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(
         KeyedStateBackendParameters<K> parameters);
      
  OperatorStateBackend createOperatorStateBackend(
          OperatorStateBackendParameters parameters);
}

With all of those changes state backends, like incremental RocksDB, will be able to measure custom metrics during the restore operation and report them to be included in recovery span.

Compatibility, Deprecation, and Migration Plan

This FLIP is proposing to clean up the StateBackend by deprecating all of the other previously used versions of the StateBackend#createKeyedStateBackend and StateBackend#createOperatorStateBackend methods. The newly proposed versions from this FLIP will be more future proof by making adding new parameters much easier while at the same time not braking compatibility for the users.

Test Plan

Change will be covered by automatic tests and tested manually inside the ConfluentOn top of automated tests, this feature is already used and tested inside Confluent. Before committing the final accepted version would be tested inside Confluent again.

Rejected Alternatives

None

...