You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »


Discussion thread
Vote thread
JIRA
Release1.19.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that the current WithPreCommitTopology interface is not enough for our use-case. The WithPreCommitTopology interface looks like this:

public interface WithPreCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT> {

    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommT>> committables);
}

The issue is that the addPreCommitTopology method accepts a CommT type, and also returns objects with the same CommT type.

In case of the Iceberg Sink, we would like to use the WithPreCommitTopology to aggregate the writer results and create a single committable from them. So we would like to change both the type, and the number of the messages. Using the current WithPreCommitTopology interface we can work around the issue by using a Tuple, or POJO where some of the fields are used only before the addPreCommitTopology method, and some of the fields are only used after the method, but this seems more like abusing the interface than using it.

This seems like a more generic issue where the WithPreCommitTopology should provide a way to transform not only the data, but the type of the data channelled through it.

Public Interfaces

We need to introduce a new TwoPhaseCommittingSinkWithPreCommitTopology to allow this transformation:

TwoPhaseCommittingSinkWithPreCommitTopology
public interface TwoPhaseCommittingSinkWithPreCommitTopology<InputT, WriterResultT, CommittableT> extends Sink<InputT> {

    /**
     * Creates a {@link PrecommittingSinkWriter} that creates write results on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    PrecommittingSinkWriter<InputT, WriterResultT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommittableT> createCommitter() throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();

    /** Returns the serializer of the write result type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, WriterResultT> extends SinkWriter<InputT> {
        /**
         * Prepares for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the two-phase commit protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        Collection<WriterResultT> prepareCommit() throws IOException, InterruptedException;
    }
} 

We need to make sure that the TwoPhaseCommittingSink interface remains working, as it is already marked PublicEvolving. The result would look like this:

TwoPhaseCommittingSink
public interface TwoPhaseCommittingSink<InputT, CommT> extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, CommT, CommT> {
    /** Returns the serializer of the write result type.*/
    @Override
    default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
        return getCommittableSerializer();
    }
}

We also need to change the WithPreCommitTopology and the WithPostCommitTopology respectively. These are Experimental interfaces, so we can change them like this:

WithPreCommitTopology
public interface WithPreCommitTopology<InputT, WriteResultT, CommittableT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResultT, CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<WriteResultT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommittableT>> committables);
}
WithPostCommitTopology
public interface WithPostCommitTopology<InputT, WriteResult, CommT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResult, CommT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables);
}

Proposed Changes

Other than the interface changes we should modify the SinkTransformationTranslator.addCommittingTopology method to make sure that new parameter types are followed. When we generating the topology for sinks which does not implement the WithPreCommitTopology, we need to add a default transformation which casts WriteResults to Commitables. This should not be the issue, since these should be the same for these sinks anyway.

Compatibility, Deprecation, and Migration Plan

After the change the users of the TwoPhaseCommittingSink PublicEvolving interface will have a new identity map transformation in their execution graph, but otherwise they should be not be affected.

The Sinks implementing WithPreCommitTopology or WithPostCommitTopology Experimental interfaces will need to change the code to add the new generic types to their classes.

Test Plan

We will clearly demonstrate via the new Iceberg SinkV2 that the TwoPhaseCommittingSinkWithPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.

Rejected Alternatives

We considered to change the existing TwoPhaseCommittingSink interface instead of introducing a new one, but decided against it since it is already marked as PublicEvolving. Another reason is that the users of the current Sink API have not complained about the restrictions in the WithPostCommitTopology so clearly there are several important use-cases where the type transformation is not needed in the Sinks.

  • No labels