Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that there is no way to emit metrics from the committer. This would cause loss of functionality when migrating to the SinkV2 implementation, as previously it was possible to emit commit related metrics (commit duration, committed data files, committed data files size etc), but after the migration there is no way to archive the same functionality. We should also provide similar context information as we do in the Writer’s case, which should be discussed further on the mailing list.

...

We should change the TwoPhaseCommitSink interface by adding a new createCommitter(InitContext context) method, and we should deprecate the old createCommitter() method, like this:

Code Block

@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {
[..]
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;
    
    @Deprecated
    Committer<CommT> createCommitter() throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    default Committer<CommT> createCommitter(CommitterInitContext context) throws IOException {
       createCommitter();
    }

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommT> getCommittableSerializer();
[..]
   /** The interface exposes some runtime info for creating a {@link Committer}. */
    @PublicEvolving
    interface CommitterInitContext {
        /**
         * The first checkpoint id when an application is started and not recovered from a
         * previously taken checkpoint or savepoint.
         */
        long INITIAL_CHECKPOINT_ID = 1;

        /** @return The id of task where the committer is running. */
        int getSubtaskId();

        /** @return The number of parallel committer tasks. */
        int getNumberOfParallelSubtasks();

        /**
         * Gets the attempt number of this parallel subtask. First attempt is numbered 0.
         *
         * @return Attempt number of the subtask.
         */
        int getAttemptNumber();

        /** @return The metric group this committer belongs to. */
        SinkCommitterMetricGroup metricGroup();

        /**
         * Returns id of the restored checkpoint, if state was restored from the snapshot of a
         * previous execution.
         */
        OptionalLong getRestoredCheckpointId();

        /**
         * The ID of the current job. Note that Job ID can change in particular upon manual restart.
         * The returned ID should NOT be used for any job management tasks.
         */
        JobID getJobId();
    }
}

...