Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that there is no way to emit metrics from the committer. This would cause loss of functionality when migrating to the SinkV2 implementation, as previously it was possible to emit commit related metrics (commit duration, committed data files, committed data files size etc), but after the migration there is no way to archive the same functionality. We should also provide similar context information as we do in the Writer’s case, which should be discussed further on the mailing list.

Public Interfaces

We should change the TwoPhaseCommitSink interface by adding a new createCommitter(InitContext context) method, and we should deprecate the old createCommitter() method, like this:

...

Code Block
public interface SinkWriterMetricGroup extends OperatorMetricGroup {

    /** The total number of committables arrived. */
    Counter getNumCommittablesTotalCounter();

    /**
     * The total number of committable failures.
     */
    Counter getNumCommittableErrorsCounter();

    /**
     * The total number of successful committables.
     */
    Counter getNumCommittableSuccessCounter();

    /**
     * The pending committables.
     */
    Gauge getNumPendingCommittablesGauge();
}

Proposed Changes

Similarly to the SinkWriterOperator we should get the Sink as a constructor parameter for the CommitterOperator and we should create the Committers in the CommitterOperator.initializeState. The CommitterInitContext should be based on the StateInitializationContext.

Also we should implement the metrics, which is quite simple for the Counters, but we need to find a way to aggregate the pending Committables if possible.

Compatibility, Deprecation, and Migration Plan

Existing SinkV2 implementations could rely on the default implementation of the createCommitter(CommitterInitContext context) method, and the migration could be easily done during the deprecation period by moving the old implementation of the createCommitter() to the new method. Otherwise this is an additional feature which should not impact other users.

Test Plan

We will clearly demonstrate via the new Iceberg SinkV2 that the metrics are working as expected and propagated properly.

Rejected Alternatives

We could avoid changing the createCommitter() method by adding a new init(CommitterInitContext context) method to the Committer interface, but this could cause confusion around the API usage, as the Writer and the Committer would achieve the same functionality in a different way. We opted for the solution which would cause a more solid API in the longer term.

...