Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

StatefulSinkWriter

Page properties


Discussion threadhttps://lists.apache.org/thread/344pzbrqbbb4w0sfj67km25msp7hxlyd
Vote threadhttps://lists.apache.org/thread/zjkss3j9d291ldvynspotzjlsl3tmdkl
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33328

Release1.19.0


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

During the migration of the Iceberg Sink to the new Flink Sink V2 API we found that the current WithPreCommitTopology interface is not enough sufficient for our use-case. The WithPreCommitTopology interface looks like this:

...

The general consensus was to reject the original approach (minimal changes to the Sink API), and move forward to rewrite the Sink API using mixin interfaces to enhance the extendibility of the API.

Generic approach

When redesigning the API we followed the generic guidelines below:

  • Every new feature should be added with a Supports<FeatureName> interface (similar to the Source API), like
    • SupportsCommiter
    • SupportsWriterState
    • SupportsPreWriteTopology
    • SupportsPreCommitToplogy
    • SupportsPostCommitTopology
  • Keep the number of the inner interfaces/classes minimal, and expose them as a full fledged classes
    • Committer - no change here
    • CommitterInitContext
    • StatefulSinkWriter
    • WriterInitContext
  • No redefining interface methods during interface inheritance - it would prevent future deprecation
  • Minimal inheritance extension - for more flexibility in the future. Kept only
    • StatefulSinkWriter, CommittingSinkWriter - which inherits from SinkWriter
    • CommitterInitContext, WriterInitContext - which inherits from InitContext

Public Interfaces

Feature interfaces

TwoPhaseCommittingSink

The TwoPhaseCommittingSink will be replaced by the SupportsCommitter mixin interface:

Code Block
languagejava
titleSupportsCommitter
public interface SupportsCommitter<CommittableT> {

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommittableT> createCommitter(CommitterInitContext context) throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommittableT> getCommittableSerializer();
}

StatefulSink

The StatefulSink will be replaced by the SupportsWriterState mixin interface:

Code Block
languagejava
titleSupportsWriterState
public interface SupportsWriterState<InputT, WriterStateT> {

    /**
     * Create a {@link StatefulSinkWriter} from a recovered state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException;

    /**
     * Any stateful sink needs to provide this state serializer and implement {@link
     * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
     * #restoreWriter(WriterInitContext, Collection)} on recovery.
     *
     * @return the serializer of the writer's state type.
     */
    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();

    /**
     * A mix-in for {@link SupportsWriterState} that allows users to migrate from a sink with a
     * compatible state to this sink.
     */
    @PublicEvolving
    interface WithCompatibleState {
        /**
         * A list of state names of sinks from which the state can be restored. For example, the new
         * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a
         * drop-in replacement when resuming from a checkpoint/savepoint.
         */
        Collection<String> getCompatibleWriterStateNames();
    }
}

WithPreWriteTopology

The WithPreWriteTopology will be replaced by the SupportsPreWriteTopology mixin interface:

Code Block
languagejava
titleSupportsPreWriteTopology
public interface SupportsPreWriteTopology<InputT> {

    /**
     * Adds an arbitrary topology before the writer. The topology may be used to repartition the
     * data.
     *
     * @param inputDataStream the stream of input records.
     * @return the custom topology before {@link SinkWriter}.
     */
    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
}

WithPreCommitTopology

The WithPreCommitTopology will be replaced by the SupportsPreCommitTopology mixin interface. The addPreCommitTopology method is changed so the input stream and the output stream could contain different types which is needed for the original goal of this FLIP. Also we need a different serializer for the WriteResultT objects if they are rebalanced between the steps. The interface is inherited from SupportsCommitter, as the interface is not usable without a committer:

Code Block
languagejava
titleSupportsPreCommitTopology
public interface SupportsPreCommitTopology<WriterResultT, CommittableT>
        extends SupportsCommitter<CommittableT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */
    DataStream<CommittableMessage<CommittableT>> addPreCommitTopology(
            DataStream<CommittableMessage<WriterResultT>> committables);

    /** Returns the serializer of the WriteResult type. */
    SimpleVersionedSerializer<WriterResultT> getWriteResultSerializer();
}

WithPostCommitTopology

The WithPostCommitTopology will be replaced by the SupportsPostCommitTopology mixin interface. The interface is inherited from SupportsCommitter, as the interface is not usable without a committer mixin interface:

Code Block
languagejava
titleSupportsPostCommitTopology
public interface SupportsPostCommitTopology<CommittableT> extends SupportsCommitter<CommittableT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It is strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     *
     * @param committables the stream of committables.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommittableT>> committables);
}

Extracted interfaces

The following interfaces were inner interfaces in the previous iteration of the Sink API. In the case of PrecommittingSinkWriter which was an inner class of TwoPhaseCommittingSink this prevented the evolution of the TwoPhaseCommittingSink, because of the generic types of the classes were tightly coupled. Also, extracting these interfaces will result in a Sink API which is similar to the Source API structure.

CommitterInitContext

CommitterInitContext was extracted from TwoPhaseCommittingSink:

...

Code Block
languagejava
titleCommitterInitContext
public interface CommitterInitContext extends InitContext {
    /** @return The metric group this committer belongs to. */
    SinkCommitterMetricGroup metricGroup();
}

CommittingSinkWriter

CommittingSinkWriter was extracted from TwoPhaseCommittingSink, and the original name was PrecommittingSinkWriter. This was changed so the coupling between the Committer, CommitterInitContext, CommittingSinkWriter and SupportsCommitter interfaces are more pronounced. We accepted the inheritance here, as it has a clear purpose:

...

Code Block
languagejava
titleCommittingSinkWriter
public interface CommittingSinkWriter<InputT, CommittableT> extends SinkWriter<InputT> {
    /**
     * Prepares for a commit.
     *
     * <p>This method will be called after {@link #flush(boolean)} and before {@link
     * StatefulSinkWriter#snapshotState(long)}.
     *
     * @return The data to commit as the second step of the two-phase commit protocol.
     * @throws IOException if fail to prepare for a commit.
     */
    Collection<CommittableT> prepareCommit() throws IOException, InterruptedException;
}

WriterInitContext

WriterInitContext was extracted from StatefulSink:

Code Block
languagejava
titleWriterInitContext
public interface WriterInitContext extends org.apache.flink.api.connector.sink2.InitContext {
    /**
     * Gets the {@link UserCodeClassLoader} to load classes that are not in system's classpath, but
     * are part of the jar file of a user job.
     *
     * @see UserCodeClassLoader
     */
    UserCodeClassLoader getUserCodeClassLoader();

    /**
     * Returns the mailbox executor that allows to execute {@link Runnable}s inside the task thread
     * in between record processing.
     *
     * <p>Note that this method should not be used per-record for performance reasons in the same
     * way as records should not be sent to the external system individually. Rather, implementers
     * are expected to batch records and only enqueue a single {@link Runnable} per batch to handle
     * the result.
     */
    MailboxExecutor getMailboxExecutor();

    /**
     * Returns a {@link ProcessingTimeService} that can be used to get the current time and register
     * timers.
     */
    ProcessingTimeService getProcessingTimeService();

    /** @return The metric group this writer belongs to. */
    SinkWriterMetricGroup metricGroup();

    /** Provides a view on this context as a {@link SerializationSchema.InitializationContext}. */
    SerializationSchema.InitializationContext asSerializationSchemaInitializationContext();

    /** Returns whether object reuse has been enabled or disabled. */
    boolean isObjectReuseEnabled();

    /** Creates a serializer for the type of sink's input. */
    <IN> TypeSerializer<IN> createInputSerializer();

    /**
     * Returns a metadata consumer, the {@link SinkWriter} can publish metadata events of type
     * {@link MetaT} to the consumer.
     *
     * <p>It is recommended to use a separate thread pool to publish the metadata because enqueuing
     * a lot of these messages in the mailbox may lead to a performance decrease. thread, and the
     * {@link Consumer#accept} method is executed very fast.
     */
    @Experimental
    default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
        return Optional.empty();
    }
}

StatefulSinkWriter

StatefulSinkWriter was extracted from StatefulSink. We accepted the inheritance here, as it has a clear purpose:

...

Code Block
languagejava
titleSupportsCommitterStatefulSinkWriter
public interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
    /**
     * @return The writer's state.
     * @throws IOException if fail to snapshot writer's state.
     */
    List<WriterStateT> snapshotState(long checkpointId) throws IOException;
}

Other modified interfaces

Sink

As a result the Sink interface createWriter method parameter type should be changed:

Code Block
languagejava
titleSink
public interface Sink<InputT> extends Serializable {
    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException;
} 

Proposed Changes

Other than the interface changes we should modify the SinkTransformationTranslator.addCommittingTopology method to make sure that new parameter types are followed.

Compatibility, Deprecation, and Migration Plan

Deprecation

We will deprecated deprecate the old interfaces, but provide a default implementation based on the new API to allow Sink developers to migrate to the new API conveniently.

PublicEvolving interfaces

Some of the changes are affecting PublicEvolving interfaces:

...

These changes need to be deprecated and kept for at least 1 minor release (until 1.20 as per current plans at the point of writing this).

The proposed solution is to make the old interfaces extend the new ones.

StatefulSink

StatefulSink will extend the SupportsWriterState interface and provide a default implementation for the old methods/interfaces:

Code Block
languagejava
titleStatefulSink
@Deprecated
public interface StatefulSink<InputT, WriterStateT>
        extends Sink<InputT>, SupportsWriterState<InputT, WriterStateT> {

    /**
     * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
     * state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            Sink.InitContext context, Collection<WriterStateT> recoveredState) throws IOException {
        throw new UnsupportedOperationException(
                "Deprecated, please use restoreWriter(WriterInitContext, Collection<WriterStateT>)");
    }

    /**
     * Create a {@link org.apache.flink.api.connector.sink2.StatefulSinkWriter} from a recovered
     * state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default StatefulSinkWriter<InputT, WriterStateT> restoreWriter(
            WriterInitContext context, Collection<WriterStateT> recoveredState) throws IOException {
        return restoreWriter(new InitContextWrapper(context), recoveredState);
    }

    /**
     * A mix-in for {@link StatefulSink} that allows users to migrate from a sink with a compatible
     * state to this sink.
     */
    @PublicEvolving
    interface WithCompatibleState extends SupportsWriterState.WithCompatibleState {}

    /**
     * A {@link SinkWriter} whose state needs to be checkpointed.
     *
     * @param <InputT> The type of the sink writer's input
     * @param <WriterStateT> The type of the writer's state
     */
    @PublicEvolving
    interface StatefulSinkWriter<InputT, WriterStateT>
            extends org.apache.flink.api.connector.sink2.StatefulSinkWriter<InputT, WriterStateT> {}
}

TwoPhaseCommittingSink

TwoPhaseCommittingSink will extend the SupportsCommitter interface and provide a default implementation for the old methods/interfaces:

Code Block
languagejava
titleTwoPhaseCommittingSink
@Deprecated
public interface TwoPhaseCommittingSink<InputT, CommT>
        extends Sink<InputT>, SupportsCommitter<CommT> {

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     * @deprecated Please use {@link #createCommitter(CommitterInitContext)}
     */
    @Deprecated
    default Committer<CommT> createCommitter() throws IOException {
        throw new UnsupportedOperationException(
                "Deprecated, please use createCommitter(CommitterInitContext)");
    }

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @param context The context information for the committer initialization.
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    default Committer<CommT> createCommitter(CommitterInitContext context) throws IOException {
        return createCommitter();
    }

    /** A {@link SinkWriter} that performs the first part of a two-phase commit protocol. */
    @PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends CommittingSinkWriter<InputT, CommT> {}
}

Sink.createWriter

The Sink.createWriter method interface deprecation will be handled like this:

Code Block
languagejava
titleSink
public interface Sink<InputT> extends Serializable {

    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     * @deprecated Please implement {@link #createWriter(WriterInitContext)}. For backward
     *     compatibility reasons - to keep {@link Sink} a functional interface - Flink did not
     *     provide a default implementation. New {@link Sink} implementations should implement this
     *     method, but it will not be used, and it will be removed in 1.20.0 release. Do not use
     *     {@link Override} annotation when implementing this method, to prevent compilation errors
     *     when migrating to 1.20.x release.
     */
    @Deprecated
    SinkWriter<InputT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    default SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
        return createWriter(new InitContextWrapper(context));
    }

    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
    @PublicEvolving
    @Deprecated
    interface InitContext extends org.apache.flink.api.connector.sink2.InitContext {
[..]
    }

    /**
     * Class for wrapping a new {@link WriterInitContext} to an old {@link InitContext} until
     * deprecation.
     *
     * @deprecated Internal, do not use it.
     */
    @Deprecated
    class InitContextWrapper implements InitContext {
        private final WriterInitContext wrapped;

        InitContextWrapper(WriterInitContext wrapped) {
            this.wrapped = wrapped;
        }

        @Override
        public int getSubtaskId() {
            return wrapped.getSubtaskId();
        }
[..]
    }
}

Experimental interfaces

Some of the changes are affecting Experimental interfaces:

...

Based on FLIP-321 we could remove the interfaces in a new minor release, but to provide more graceful migration period, we can keep the old interfaces by extending the new ones.

WithPreWriteTopology

WithPreWriteTopology will extend the SupportsPreWriteTopology interface:

Code Block
languagejava
titleWithPreWriteTopology
@Deprecated
public interface WithPreWriteTopology<InputT>
        extends Sink<InputT>, SupportsPreWriteTopology<InputT> {}

WithPreCommitTopology

WithPreCommitTopology will extend the SupportsPreCommitTopology interface, and need to provide a default implementation for the getWriteResultSerializer method:

Code Block
languagejava
titleWithPreCommitTopology
@Deprecated
public interface WithPreCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPreCommitTopology<CommT, CommT> {
    /** Defaults to {@link #getCommittableSerializer} for backward compatibility */
    default SimpleVersionedSerializer<CommT> getWriteResultSerializer() {
        return getCommittableSerializer();
    }
}

WithPostCommitTopology

WithPostCommitTopology will extend the SupportsPostCommitTopology interface:

Code Block
languagejava
titleWithPreWriteTopology
@Deprecated
public interface WithPostCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT>, SupportsPostCommitTopology<CommT> {}

Migration Plan

Migrating to the new interfaces should be straightforward following the next steps:

  • StatefulSink implementations:
    • Sink should implement SupportsStatefulWriter interface - no method name change, just interface name and import change
    • Writer should implement the new StatefulWriter interface - import change only
  • TwoPhaseCommittingSink
    • Sink should implement SupportsCommitter interface - no method name change, just interface name and import change
    • Writer should implement the new CommittingWriter interface - no method name change, just interface name and import change
  • Sink
    • If the Sink is used as a functional interface, then no change is needed
    • It the Sink createWriter method is implemented then interface name and import change
  • WithPreWriteTopology
    • interface name and import change
  • WithPreCommitTopology
    • Implementation for getWriteResultSerializer is needed 
  • WithPostCommitTopology
    • interface name and import change

Test Plan

As a first step, we create our own unit tests, but keep the original tests intact. This will allow as us to confirm, that the changes are backward compatible. The new unit tests will confirm that the new features are working.

We will clearly demonstrate via the new Iceberg SinkV2 that the SupportsPreCommitTransformation is working as expected and propagated properly. The KafkaSink could be used to validate that the original usage of the TwoPhaseCommittingSink is also working.

Rejected Alternatives

Adding a new TwoPhaseCommittingSinkWithPreCommitTopology interface

The approach would have kept the changes minimal, stick to the current Sink API design. We introduce the required new combination of interfaces (TwoPhaseCommttingSinkWithPreCommitTopology, WithPostCommitTopologyWithPreCommitTopology), and do not change the API structure.

...

  • Easier to evolve - this will become especially important when this API becomes Public
  • Limits the number of required interfaces - as we do not have to create a specific interface combining the possible features
  • The mixin approach is more similar to the current stable Source API interfaces - so developers will have easier time to onboard Flink
         - Cons:
              - Harder to evolve
              - The number of interfaces increases with the possible combinations
              - Inconsistent API patterns wrt Source API - harder for developers to understand 

Adding the new methods directly to TwoPhaseCommittingSink

We considered to change the existing TwoPhaseCommittingSink interface instead of introducing a new one, but decided against it since it is already marked as PublicEvolving. Another reason is that the users of the current Sink API have not complained about the restrictions in the WithPostCommitTopology so clearly there are several important use-cases where the type transformation is not needed in the Sinks.

WithPreCommitTopology and WithPostCommitTopology to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface

In a previous iterator the FLIP suggested that we should change WithPreCommitTopology and WithPostCommitTopology interfaces to extend the new TwoPhaseCommittingSinkWithPreCommitTransformation interface. This approach would have worked in this case, but the result would be awkward for cases where WithPostCommitTopology would be needed, but there is no need for WithPreCommitTopology. Also this solution is not too flexible and hard to apply if more interfaces are needed in the future.

...