Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In case of the Iceberg Sink, we would like to use the WithPreCommitTopology to aggregate the writer results and create a single committable from them. So we would like to change both the type, and the number of the messages. Using the current WithPreCommitTopology interface we can work around the issue by using a Tuple, or POJO where some of the fields are used only before the addPreCommitTopology method, and some of the fields are only used after the method, but this seems more like abusing the interface than using it.

This seems like is a more generic issue where the WithPreCommitTopology should provide a way to transform not only the data, but the type of the data channelled through it.

...

We need to introduce a new TwoPhaseCommittingSinkWithPreCommitTopology to allow this transformation, because we need the change the types used by the createWriter and the createCommiter methods:

Code Block
languagejava
titleTwoPhaseCommittingSinkWithPreCommitTopology
public interface TwoPhaseCommittingSinkWithPreCommitTopology<InputT, WriterResultT, CommittableT> extends Sink<InputT> {

    /**
     * Creates a {@link PrecommittingSinkWriter} that creates write results on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    PrecommittingSinkWriter<InputT, WriterResultT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommittableT> createCommitter() throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();

    /** Returns the serializer of the write result type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, WriterResultT> extends SinkWriter<InputT> {
        /**
         * Prepares for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the two-phase commit protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        Collection<WriterResultT> prepareCommit() throws IOException, InterruptedException;
    }
} 

...

Code Block
languagejava
titleTwoPhaseCommittingSink
public interface TwoPhaseCommittingSink<InputT, CommT> extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, CommT, CommT> {}

We plan to change the WithPreCommitTopology and the WithPostCommitTopology to mixin like interfaces. These are Experimental interfaces, so we can change them to archive a cleaner API.
They would not extend the TwoPhaseCommittingSink interface, as it was done previously. They would just contain the methods required for achieving the appropriate features and needed during the Sink creation. It would be the responsibility of the API user to extend all of the correct interfaces with the correct types.
While previously it was enough to extend the WithPreCommitTopology, now it would need to extend TwoPhaseCommittingSinkWithPreCommitTransformation and the WithPreCommitTopology interface too.

Before:

Code Block
languagejava
titleBefore
public interface IcebergSink implements
    WithPreCommitTopology<InputT, CommittableT>,
    WithPostCommitTopology<InputT, CommittableT> {[..]}

After:

Code Block
languagejava
titleAfter
public interface IcebergSink implements /** Returns the serializer of the write result type.*/
    @Override
    defaultTwoPhaseCommittingSinkWithPreCommitTransformation<InputT, SimpleVersionedSerializer<CommT> getWriteResultSerializer() {WriteResultT, CommittableT>,
    WithPreCommitTopology<WriteResultT, CommittableT>,
   return getCommittableSerializer();
    }
}WithPostCommitTopology<CommittableT> {[..]}

Non-matching types could cause runtime issues, but the API will be flexible, which is more important in the log run.

We also need to change the WithPreCommitTopology and the WithPostCommitTopology respectively . These are Experimental interfaces, so we can change them like this:

Code Block
languagejava
titleWithPreCommitTopology
public interface WithPreCommitTopology<InputTWithPreCommitTopology<WriteResultT, CommittableT> { WriteResultT, CommittableT>

    /** Returns the serializer of the write result type. */
  extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResultT, CommittableT> {SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer(); 

      /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<WriteResultT>>DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommittableT>>DataStream<CommittableMessage<WriteResultT>> committables);
}


Code Block
languagejava
titleWithPostCommitTopology
public interface WithPostCommitTopology<InputT, WriteResult, CommT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResult, CommT>WithPostCommitTopology<CommT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables);
}

...

After the change the users of the TwoPhaseCommittingSink PublicEvolving interface will have a new identity map transformation in their execution graph, but otherwise they should be not be affected.

...

The Sinks implementing WithPreCommitTopology or WithPostCommitTopology Experimental interfaces will need to change the code to add the new generic types to their classes., and implement the TwoPhaseCommittingSink or the TwoPhaseCommittingSinkWithPreCommitTransformation

Test Plan

We will clearly demonstrate via the new Iceberg SinkV2 that the TwoPhaseCommittingSinkWithPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.

Rejected Alternatives

Adding the new methods directly to TwoPhaseCommittingSink

We considered to change the existing TwoPhaseCommittingSink interface instead of introducing a new one, but decided against it since it is already marked as PublicEvolving. Another reason is that the users of the current Sink API have not complained about the restrictions in the WithPostCommitTopology so clearly there are several important use-cases where the type transformation is not needed in the Sinks.

WithPreCommitTopology and WithPostCommitTopology to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface

In a previous iterator the FLIP suggested that we should change WithPreCommitTopology and WithPostCommitTopology interfaces to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface. This approach would have worked in this case, but the result would be awkward for cases where WithPostCommitTopology would be needed, but there is no need for WithPreCommitTopology. Also this solution is not too flexible and hard to apply if more interfaces are needed in the future.

The previously suggested interfaces:

Code Block
languagejava
titleWithPreCommitTopology
public interface WithPreCommitTopology<InputT, WriteResultT, CommittableT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResultT, CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<WriteResultT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommittableT>> committables);
}


Code Block
languagejava
titleWithPostCommitTopology
public interface WithPostCommitTopology<InputT, WriteResult, CommT>
        extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResult, CommT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables);
}