Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This is a more generic issue where the WithPreCommitTopology should provide a way to transform not only the data, but the type of the data channelled through it.

Public Interfaces

We need to introduce a new TwoPhaseCommittingSinkWithPreCommitTopology to allow this transformation, because we need the change the types used by the createWriter and the createCommiter methods:

Code Block
languagejava
titleTwoPhaseCommittingSinkWithPreCommitTopology
public interface TwoPhaseCommittingSinkWithPreCommitTopology<InputT, WriterResultT, CommittableT> extends Sink<InputT> {

    /**
     * Creates a {@link PrecommittingSinkWriter} that creates write results on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    PrecommittingSinkWriter<InputT, WriterResultT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommittableT> createCommitter() throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, WriterResultT> extends SinkWriter<InputT> {
        /**
         * Prepares for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the two-phase commit protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        Collection<WriterResultT> prepareCommit() throws IOException, InterruptedException;
    }
} 

We need to make sure that the TwoPhaseCommittingSink interface remains working, as it is already marked PublicEvolving. The result would look like this:

Code Block
languagejava
titleTwoPhaseCommittingSink
public interface TwoPhaseCommittingSink<InputT, CommT> extends TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, CommT, CommT> {}

We plan to change the WithPreCommitTopology and the WithPostCommitTopology to mixin like interfaces. These are Experimental interfaces, so we can change them to archive a cleaner API.
They would not extend the TwoPhaseCommittingSink interface, as it was done previously. They would just contain the methods required for achieving the appropriate features and needed during the Sink creation. It would be the responsibility of the API user to extend all of the correct interfaces with the correct types.
While previously it was enough to extend the WithPreCommitTopology, now it would need to extend TwoPhaseCommittingSinkWithPreCommitTransformation and the WithPreCommitTopology interface too.

Before:

Code Block
languagejava
titleBefore
public interface IcebergSink implements
    WithPreCommitTopology<InputT, CommittableT>,
    WithPostCommitTopology<InputT, CommittableT> {[..]}

After:

Code Block
languagejava
titleAfter
public interface IcebergSink implements 
    TwoPhaseCommittingSinkWithPreCommitTransformation<InputT, WriteResultT, CommittableT>,
    WithPreCommitTopology<WriteResultT, CommittableT>,
    WithPostCommitTopology<CommittableT> {[..]}

Non-matching types could cause runtime issues, but the API will be flexible, which is more important in the log run.

We also need to change the WithPreCommitTopology and the WithPostCommitTopology respectively like this:

Code Block
languagejava
titleWithPreCommitTopology
public interface WithPreCommitTopology<WriteResultT, CommittableT> {  

    /** Returns the serializer of the write result type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer(); 

     /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
            DataStream<CommittableMessage<WriteResultT>> committables);
}
Code Block
languagejava
titleWithPostCommitTopology
public interface WithPostCommitTopology<CommT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables);
}

Proposed Changes

Other than the interface changes we should modify the SinkTransformationTranslator.addCommittingTopology method to make sure that new parameter types are followed. When we generating the topology for sinks which does not implement the WithPreCommitTopology, we need to add a default transformation which casts WriteResults to Commitables. This should not be the issue, since these should be the same for these sinks anyway.

Compatibility, Deprecation, and Migration Plan

After the change the users of the TwoPhaseCommittingSink PublicEvolving interface will have a new identity transformation in their execution graph, but otherwise they should be not be affected.

The Sinks implementing WithPreCommitTopology or WithPostCommitTopology Experimental interfaces will need to change the code to add the new generic types to their classes, and implement the TwoPhaseCommittingSink or the TwoPhaseCommittingSinkWithPreCommitTransformation

Test Plan

We will clearly demonstrate via the new Iceberg SinkV2 that the TwoPhaseCommittingSinkWithPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.

The community discussed this topic from several different angles:

  • During the implementation of FLIP-371 we broke the backward compatibility of the Sink API. See the discussion on FLINK-25857
  • The discussion continued on the mailing list in this thread
  • Also the discussion of this FLIP on the mailing list in this thread

The general consensus was to reject the original approach (minimal changes to the Sink API), and move forward to rewrite the Sink API using mixin interfaces to enhance the extendibility of the API.

Generic approach

When redesigning the API we followed the generic guidelines below:

  • Every new feature should be added with a Supports<FeatureName> interface (similar to the Source API), like
    • SupportsCommiter
    • SupportsWriterState
    • SupportsPreWriteTopology
    • SupportsPreCommitToplogy
    • SupportsPostCommitTopology
  • Keep the number of the inner interfaces/classes minimal, and expose them as a full fledged classes
    • Committer - no change here
    • CommitterInitContext
    • StatefulSinkWriter
    • WriterInitContext
  • No redefining interface methods during interface inheritance - it would prevent future deprecation
  • Minimal inheritance extension - for more flexibility in the future. Kept only
    • StatefulSinkWriter, CommittingSinkWriter - which inherits from SinkWriter
    • CommitterInitContext, WriterInitContext - which inherits from InitContext

Public Interfaces

Feature interfaces

TwoPhaseCommittingSink

The TwoPhaseCommittingSink will be replaced by the SupportsCommitter mixin interface:

Code Block
languagejava
titleSupportsCommitter
public interface SupportsCommitter<CommittableT> {

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommittableT> createCommitter(CommitterInitContext context) throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
}

StatefulSink

The StatefulSink will be replaced by the SupportsWriterState mixin interface:

Code Block
languagejava
titleSupportsWriterState
public interface SupportsWriterState<InputT, WriterStateT> {

    /**
     * Create a {@link StatefulSinkWriter} from a recovered state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException;

    /**
     * Any stateful sink needs to provide this state serializer and implement {@link
     * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
     * #restoreWriter(WriterInitContext, Collection)} on recovery.
     *
     * @return the serializer of the writer's state type.
     */
    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();

    /**
     * A mix-in for {@link SupportsWriterState} that allows users to migrate from a sink with a
     * compatible state to this sink.
     */
    @PublicEvolving
    interface WithCompatibleState {
        /**
         * A list of state names of sinks from which the state can be restored. For example, the new
         * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a
         * drop-in replacement when resuming from a checkpoint/savepoint.
         */
        Collection<String> getCompatibleWriterStateNames();
    }
}

WithPreWriteTopology

The WithPreWriteTopology will be replaced by the SupportsPreWriteTopology mixin interface:

Code Block
languagejava
titleSupportsPreWriteTopology
public interface SupportsPreWriteTopology<InputT> {

    /**
     * Adds an arbitrary topology before the writer. The topology may be used to repartition the
     * data.
     *
     * @param inputDataStream the stream of input records.
     * @return the custom topology before {@link SinkWriter}.
     */
    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
}

WithPreCommitTopology

The WithPreCommitTopology will be replaced by the SupportsPreCommitTopology mixin interface. The addPreCommitTopology method is changed so the input stream and the output stream could contain different types which is needed for the original goal of this FLIP. Also we need a different serializer for the WriteResultT objects if they are rebalanced between the steps. The interface is inherited from SupportsCommitter, as the interface is not usable without a committer:

Code Block
languagejava
titleSupportsPreCommitTopology
public interface SupportsPreCommitTopology<WriterResultT, CommittableT>
        extends SupportsCommitter<CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
            DataStream<CommittableMessage<WriterResultT>> committables);

    /** Returns the serializer of the WriteResult type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
}

WithPostCommitTopology

The WithPostCommitTopology will be replaced by the SupportsPostCommitTopology mixin interface. The interface is inherited from SupportsCommitter, as the interface is not usable without a committer:

Code Block
languagejava
titleSupportsPostCommitTopology
public interface SupportsPostCommitTopology<CommittableT> extends SupportsCommitter<CommittableT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> committables);
}

Extracted interfaces

The following interfaces were inner interfaces in the previous iteration of the Sink API. In the case of PrecommittingSinkWriter which was an inner class of TwoPhaseCommittingSink this prevented the evolution of the TwoPhaseCommittingSink, because of the generic types of the classes were tightly coupled. Also, extracting these interfaces will result in a Sink API which is similar to the Source API structure.

CommitterInitContext

CommitterInitContext was extracted from TwoPhaseCommittingSink:


Code Block
languagejava
titleCommitterInitContext
public interface CommitterInitContext extends InitContext {
    /** @return The metric group this committer belongs to. */
    SinkCommitterMetricGroup metricGroup();
}

CommittingSinkWriter

CommittingSinkWriter was extracted from TwoPhaseCommittingSink, and the original name was PrecommittingSinkWriter. This was changed so the coupling between the Committer, CommitterInitContext, CommittingSinkWriter and SupportsCommitter interfaces are more pronounced. We accepted the inheritance here, as it has a clear purpose:


Code Block
languagejava
titleCommittingSinkWriter
public interface CommittingSinkWriter<InputT, CommittableT> extends SinkWriter<InputT> {
    /**
     * Prepares for a commit.
     *
     * <p>This method will be called after {@link #flush(boolean)} and before {@link
     * StatefulSinkWriter#snapshotState(long)}.
     *
     * @return The data to commit as the second step of the two-phase commit protocol.
     * @throws IOException if fail to prepare for a commit.
     */
    Collection<CommittableT> prepareCommit() throws IOException, InterruptedException;
}

WriterInitContext

WriterInitContext was extracted from StatefulSink:

Code Block
languagejava
titleWriterInitContext
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext {
    /**
     * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, but
     * are part of the jar file of a user job.
     *
     * @see UserCodeClassLoader
     */
    UserCodeClassLoader getUserCodeClassLoader();

    /**
     * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task thread
     * in between record processing.
     *
     * <p>Note that this method should not be used per-record for performance reasons in the same
     * way as records should not be sent to the external system individually. Rather, implementers
     * are expected to batch records and only enqueue a single {@link Runnable} per batch to handle
     * the result.
     */
    MailboxExecutor getMailboxExecutor();

    /**
     * Returns a {@link ProcessingTimeService} that can be used to get the current time and register
     * timers.
     */
    ProcessingTimeService getProcessingTimeService();

    /** @return The metric group this writer belongs to. */
    SinkWriterMetricGroup metricGroup();

    /** Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */
    SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();

    /** Returns whether object reuse has been enabled or disabled. */
    boolean isObjectReuseEnabled();

    /** Creates a serializer for the type of sink's input. */
    <IN> TypeSerializer<IN> createInputSerializer();

    /**
     * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type
     * {@link MetaT} to the consumer.
     *
     * <p>It is recommended to use a separate thread pool to publish the metadata because enqueuing
     * a lot of these messages in the mailbox may lead to a performance decrease. thread, and the
     * {@link Consumer#accept} method is executed very fast.
     */
    @Experimental
    default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
        return Optional.empty();
    }
}

StatefulSinkWriter

StatefulSinkWriter was extracted from StatefulSink. We accepted the inheritance here, as it has a clear purpose:


Code Block
languagejava
titleSupportsCommitter
public interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
    /**
     * @return The writer's state.
     * @throws IOException if fail to snapshot writer's state.
     */
    List<WriterStateT> snapshotState(long checkpointId) throws IOException;
}

Other modified interfaces

Sink

As a result the Sink interface createWriter method parameter type should be changed:

Code Block
languagejava
titleSink
public interface Sink<InputT> extends Serializable {
    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException;
} 

Proposed Changes

Other than the interface changes we should modify the SinkTransformationTranslator.addCommittingTopology method to make sure that new parameter types are followed.

Compatibility, Deprecation, and Migration Plan

Deprecation

We will deprecated the old interfaces, but provide a default implementation based on the new API to allow Sink developers to migrate to the new API.

PublicEvolving interfaces

Some of the changes are affecting PublicEvolving interfaces:

  • StatefulSink
  • TwoPhaseCommittingSink
  • Sink

These changes need to be deprecated and kept for at least 1 minor release.

The proposed solution is to make the old interfaces extend the new ones.

StatefulSink

StatefulSink will extend the SupportsWriterState interface and provide a default implementation for the old methods/interfaces:

Code Block
languagejava
titleStatefulSink
@Deprecated
public interface StatefulSink<InputT, WriterStateT>
        extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {

    /**
     * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
     * state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException {
        throw new UnsupportedOperationException(
                "Deprecated, please use restoreWriter(WriterInitContext, Collection<WriterStateT>)");
    }

    /**
     * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
     * state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException {
        return restoreWriter(new InitContextWrapper(context), recoveredState);
    }

    /**
     * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible
     * state to this sink.
     */
    @PublicEvolving
    interface WithCompatibleState extends SupportsWriterState.WithCompatibleState {}

    /**
     * A {@link SinkWriter} whose state needs to be checkpointed.
     *
     * @param <InputT> The type of the sink writer's input
     * @param <WriterStateT> The type of the writer's state
     */
    @PublicEvolving
    interface StatefulSinkWriter<InputT, WriterStateT>
            extends org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
}

TwoPhaseCommittingSink

TwoPhaseCommittingSink will extend the SupportsCommitter interface and provide a default implementation for the old methods/interfaces:

Code Block
languagejava
titleTwoPhaseCommittingSink
@Deprecated
public interface TwoPhaseCommittingSink<InputT, CommT>
        extends Sink<InputT>, SupportsCommitter<CommT> {

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     * @deprecated Please use {@link #createCommitter(CommitterInitContext)}
     */
    @Deprecated
    default Committer<CommT> createCommitter() throws IOException {
        throw new UnsupportedOperationException(
                "Deprecated, please use createCommitter(CommitterInitContext)");
    }

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    default Committer<CommT> createCommitter(CommitterInitContext context) throws IOException {
        return createCommitter();
    }

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends CommittingSinkWriter<InputT, CommT> {}
}

Sink.createWriter

The Sink.createWriter method interface deprecation will be handled like this:

Code Block
languagejava
titleSink
public interface Sink<InputT> extends Serializable {

    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     * @deprecated Please implement {@link #createWriter(WriterInitContext)}. For backward
     *     compatibility reasons - to keep {@link Sink} a functional interface - Flink did not
     *     provide a default implementation. New {@link Sink} implementations should implement this
     *     method, but it will not be used, and it will be removed in 1.20.0 release. Do not use
     *     {@link Override} annotation when implementing this method, to prevent compilation errors
     *     when migrating to 1.20.x release.
     */
    @Deprecated
    SinkWriter<InputT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
        return createWriter(new InitContextWrapper(context));
    }

    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
    @PublicEvolving
    @Deprecated
    interface InitContext extends org.apache.flink.api.connector.sink2.InitContext {
[..]
    }

    /**
     * Class for wrapping a new {@link WriterInitContext} to an old {@link InitContext} until
     * deprecation.
     *
     * @deprecated Internal, do not use it.
     */
    @Deprecated
    class InitContextWrapper implements InitContext {
        private final WriterInitContext wrapped;

        InitContextWrapper(WriterInitContext wrapped) {
            this.wrapped = wrapped;
        }

        @Override
        public int getSubtaskId() {
            return wrapped.getSubtaskId();
        }
[..]
    }
}

Experimental interfaces

Some of the changes are affecting Experimental interfaces:

  • WithPreWriteTopology
  • WithPreCommitTopology
  • WithPostCommitTopology

Based on FLIP-321 we could remove the interfaces in a new minor release, but to provide more graceful migration period, we can keep the old interfaces by extending the new ones.

WithPreWriteTopology

WithPreWriteTopology will extend the SupportsPreWriteTopology interface:

Code Block
languagejava
titleWithPreWriteTopology
@Deprecated
public interface WithPreWriteTopology<InputT>
        extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}

WithPreCommitTopology

WithPreCommitTopology will extend the SupportsPreCommitTopology interface, and need to provide a default implementation for the getWriteResultSerializer method:

Code Block
languagejava
titleWithPreCommitTopology
@Deprecated
public interface WithPreCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPreCommitTopology<CommT, CommT> {
    /** Defaults to {@link #getCommittableSerializer} for backward compatibility */
    default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
        return getCommittableSerializer();
    }
}

WithPostCommitTopology

WithPostCommitTopology will extend the SupportsPostCommitTopology interface:

Code Block
languagejava
titleWithPreWriteTopology
@Deprecated
public interface WithPostCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPostCommitTopology<CommT> {}

Migration Plan

Migrating to the new interfaces should be straightforward following the next steps:

  • StatefulSink implementations:
    • Sink should implement SupportsStatefulWriter interface - no method name change, just interface name and import change
    • Writer should implement the new StatefulWriter interface - import change only
  • TwoPhaseCommittingSink
    • Sink should implement SupportsCommitter interface - no method name change, just interface name and import change
    • Writer should implement the new CommittingWriter interface - no method name change, just interface name and import change
  • Sink
    • If the Sink is used as a functional interface, then no change is needed
    • It the Sink createWriter method is implemented then interface name and import change
  • WithPreWriteTopology
    • interface name and import change
  • WithPreCommitTopology
    • Implementation for getWriteResultSerializer is needed 
  • WithPostCommitTopology
    • interface name and import change

Test Plan

As a first step, we create our own unit tests, but keep the original tests intact. This will allow as to confirm, that the changes are backward compatible. The new unit tests will confirm that the new features are working.

We will clearly demonstrate via the new Iceberg SinkV2 that the SupportsPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.

Rejected Alternatives

Adding a new TwoPhaseCommittingSinkWithPreCommitTopology interface

The approach would have kept the changes minimal, stick to the current Sink API design. We introduce the required new combination of interfaces (TwoPhaseCommttingSinkWithPreCommitTopology, WithPostCommitTopologyWithPreCommitTopology), and do not change the API structure.

The advantages of this approach:

  • Minimal change - smaller rewrite on the connector side
  • Type checks happen on compile time

Based on the feedback of the community we decided to accept these downsides considering the following advantages of the mixin approach:

  • Easier to evolve - this will become especially important when this API becomes Public
  • Limits the number of required interfaces - as we do not have to create a specific interface combining the possible features
  • The mixin approach is more similar to the current stable Source API interfaces - so developers will have easier time to onboard Flink
         - Cons:
              - Harder to evolve
              - The number of interfaces increases with the possible combinations
              - Inconsistent API patterns wrt Source API - harder for developers to understand 

...

Adding the new methods directly to TwoPhaseCommittingSink

...