Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Flink Version>

POC: https://github.com/AHeise/flink/tree/custom_sink_sketch

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

With this document, we mainly focus on the small-file-compaction problem which is one of the not yet covered scenarios. The problem manifests when using the FileSink  and you want to guarantee a certain file size and not end up with a lot of small files (i.e. one file per parallel subtask). It becomes very important for modern data lake use cases where usually the data is written in some kind of columnar format (Parquet or ORC) and larger files are recommended to amplify the read performance. A few examples are the Iceberg sink [1], Delta lake [2], or Hive. For Hive in particular we already have a sink that supports compaction but it is not generally applicable and only available in Flink’s Table API [3].The goal of this document is to extend the unified Sink API to broaden the spectrum of supported scenarios and fix the small-file-compaction  problem.generally applicable and only available in Flink’s Table API [3].

The goal of this document is to extend the unified Sink API to broaden the spectrum of supported scenarios and fix the small-file-compaction  problem.

Proposed Changes

After collecting more feedback we came up with the following design which incorporates the design of a Sink V2. Over the last few releases, we learned from our users that the abstraction of having a writer and a committer to implement a two-phase commit is very helpful but the GlobalCommitter in its current design cannot fulfill all necessary duties. Unfortunately, there is not an easy way to replace/remove the GlobalCommitter from the existing interfaces without breaking already built sinks. Therefore we propose Sink V2 that subsumes the current Sink interface and offers the same functionality.

The Sink V2 uses a mix-in approach to reflect the different setup options. We will create a separate package inside of flink-core to distinguish between Sink V1 and Sink V2.

Simple Sink

First, we explain how the Sink V2 subsumes the Sink V1. In general, sinks that do not use the GlobalCommitter should have it very easy to migrate to the new interfaces because all methods of the Committer and SinkWriter still exist and are unchanged.

We reduced the implementation overhead in Sink V2 and split the Sink V1 into three mixin interfaces providing different functionalities. Sink developers can not decide based on their requirements which sink interfaces they need to combine to implement their sinks.


draw.io Diagram
bordertrue
diagramNameBuild simpe Sink
simpleViewerfalse
width600
linksauto
tbstyletop
lboxtrue
diagramWidth479
revision1

Code Block
languagejava
/**
 * Base interface for developing a sink. A basic {@link Sink} is a stateless sink that can flush
 * data on checkpoint to achieve at-least-once consistency. Sinks with additional requirements
 * should implement {@link StatefulSink} or {@link TwoPhaseCommittingSink}.
 *
 * <p>The {@link Sink} needs to be serializable. All configuration should be validated eagerly. The
 * respective sink writers are transient and will only be created in the subtasks on the
 * taskmanagers.
 *
 * @param <InputT> The type of the sink's input
 */
@PublicEvolving
public interface Sink<InputT> extends Serializable {

    /**
     * Create a {@link SinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    SinkWriter<InputT> createWriter(InitContext context) throws IOException;

    /** The interface exposes some runtime info for creating a {@link SinkWriter}. */
    interface InitContext {
  ... Similar to Sink V1 InitContext
    }
}


Code Block
languagejava
/**
 * A {@link Sink} with a stateful {@link SinkWriter}.
 *
 * <p>The {@link StatefulSink} needs to be serializable. All configuration should be validated
 * eagerly. The respective sink writers are transient and will only be created in the subtasks on
 * the taskmanagers.
 *
 * @param <InputT> The type of the sink's input
 * @param <WriterStateT> The type of the sink writer's state
 */
@PublicEvolving
public interface StatefulSink<InputT, WriterStateT> extends Sink<InputT> {

    /**
     * Create a {@link StatefulSinkWriter}.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    StatefulSinkWriter<InputT, WriterStateT> createWriter(InitContext context) throws IOException;

    /**
     * Create a {@link StatefulSinkWriter} from a recovered state.
     *
     * @param context the runtime context.
     * @return A sink writer.
     * @throws IOException for any failure during creation.
     */
    StatefulSinkWriter<InputT, WriterStateT> recoverWriter(
            InitContext context, Collection<WriterStateT> recoveredState) throws IOException;

    /**
     * Any stateful sink needs to provide this state serializer and implement {@link
     * StatefulSinkWriter#snapshotState(long)} properly. The respective state is used in {@link
     * #recoverWriter(InitContext, Collection)} on recovery.
     *
     * @return the serializer of the writer's state type.
     */
    SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer();

    interface StateCompatible {
        /**
         * A list of state names of sinks from which the state can be restored. For example, the new
         * {@code FileSink} can resume from the state of an old {@code StreamingFileSink} as a
         * drop-in replacement when resuming from a checkpoint/savepoint.
         */
        Collection<String> getCompatibleWriterStateNames();
    }

    /**
     * A {@link SinkWriter} whose state needs to be checkpointed.
     *
     * @param <InputT> The type of the sink writer's input
     * @param <WriterStateT> The type of the writer's state
     */
    @PublicEvolving
    interface StatefulSinkWriter<InputT, WriterStateT> extends SinkWriter<InputT> {
        /**
         * @return The writer's state.
         * @throws IOException if fail to snapshot writer's state.
         */
        Collection<WriterStateT> snapshotState(long checkpointId) throws IOException;
    }
}


Code Block
languagejava
/**
 * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
 * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
 * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
 * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
 *
 * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
 * validated eagerly. The respective sink writers and committers are transient and will only be
 * created in the subtasks on the taskmanagers.
 *
 * @param <InputT> The type of the sink's input
 * @param <CommT> The type of the committables.
 */
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {

    /**
     * Create a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommT> createComm
itter() throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommT> getCommittableSerializer();


    /** A {@link SinkWriter} that performs the first part of a 2pc protocol. */
	@PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
        /**
         * Prepare for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the 2pc protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
    }
}


Advanced Sink

To overcome the limitations of Sink V1 we offer different hooks to insert custom topologies into the sink. In contrary, to the basic Sink we mark these interfaces as experimental because they are introducing a new concept.

draw.io Diagram
bordertrue
diagramNameAdvanced Sink
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth771
revision1

We also introduce two common wrappers around the Sink Committables to ease the implementation of the topologies based on committables.


draw.io Diagram
bordertrue
diagramNameCommittable Messages
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth726
revision2

In these topologies, the committable will always be part of the CommittableWithLinage to have a notion to which checkpoint it belongs and from which subtask it was sent. In this case, it can be either the SinkWriter or Committer depending on which topology is used (pre- or post-commit). Moreover, we introduce the CommittableSummary that is sent once as the last message of this checkpoint. Down-stream operators receiving this message can be sure that the subtask the message was coming from does not send more messages in this checkpoint.


Code Block
languagejava
/** Allows expert users to implement a custom topology before {@link SinkWriter}. */
@Experimental
public interface WithPreWriteTopology<InputT> extends Sink<InputT> {

    /**
     * Adds an arbitrary topology before the writer. The topology may be used to repartition the
     * data.
     *
     * @param inputDataStream the stream of input records.
     * @return the custom topology before {@link SinkWriter}.
     */
    DataStream<InputT> addPreWriteTopology(DataStream<InputT> inputDataStream);
}


Code Block
languagejava
/**
 * Allows expert users to implement a custom topology after {@link SinkWriter} and before {@link
 * Committer}.
 */
@Experimental
public interface WithPreCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT> {

    /**
     * Intercepts and modifies the committables sent on checkpoint or at end of input. Implementers
     * need to ensure to modify all {@link CommittableMessage}s appropriately.
     *
     * @param committables the stream of committables.
     * @return the custom topology before {@link Committer}.
     */

    DataStream<CommittableMessage<CommT>> addPreCommitTopology(
            DataStream<CommittableMessage<CommT>> committables);
}


Code Block
languagejava
/** Allows expert users to implement a custom topology after {@link Committer}. */
@Experimental
public interface WithPostCommitTopology<InputT, CommT>
        extends TwoPhaseCommittingSink<InputT, CommT> {

    /**
     * Adds a custom post-commit topology where all committables can be processed.
     *
     * <p>It's strongly recommended to keep this pipeline stateless such that batch and streaming
     * modes do not require special cases.
     *
     * <p>All operations need to be idempotent: on recovery, any number of committables may be
     * replayed that have already been committed. It's mandatory that these committables have no
     * effect on the external system.
     */
    void addPostCommitTopology(DataStream<CommittableMessage<CommT>> committables);


To replace the GlobalCommitter in existing sinks we offer a utility that simulates the GlobalCommitter with a post committing topology. 

Limitations

Since the DataStream class is not part of the flink-core module all advanced Sink interfaces are part of the flink-streaming-java. We mitigate the problem by adding enough documentation i.e. in the doc strings that users are aware of the split.

Compatibility, Deprecation, and Migration Plan

As part of this FLIP, we will migrate all sinks that are currently part of the Flink main repository to the Sink V2 interfaces (i.e. Async, Kafka, Elasticsearch, File). We will not break the existing Sink V1 interfaces because they are already used within the community but we will make them @PublicEvolving  and immediately deprecate them. Our plan is that we keep the Sink V1 interfaces at least until the next major version (2.0) of Flink.

Rejected Alternatives

Alternative 1 Global Sink Coordinator:

...

The global sink coordinator solves the small-file-compaction problem by combining committables (files with sizing information) together until a certain threshold is reached. Afterward the combined committable is forwarded to the committers that do the actual file merging and write it to the final location.

Proposed Changes

Extend the unified Sink interface by allowing to create sink coordinator similar to the `SourceEnumerator` [4]. The creation is optional so the forwarding of the committable to the Job Manager will also only happen if the connector specifies one. In general, the behaviour of sending the committables does not change (sending them on preSnapshot) and the sink coordinator needs to write them into the checkpoint.

Limitations

Unfortunately, checkpoint for the sink coordinator is not easily possible because the coordinators are checkpointed before the operators in the pipeline. In the case of the sources, this works well because the source coordinator is basically the first operator of the pipeline. For sinks, it means that the sink coordinator would be snapshotting its state before the sink writers leading to potential data loss. Making the sink coordinator stateful would require first changes to the coordinator framework around how they are doing checkpoints.

...

The committable aggregators are optional operators between the sink writers and committers. The user can define the key for the exchange to the comittable aggregator, the parallelism of the committable aggregator, and the key for exchange from the committable aggregators to the committers.


Proposed Changes

We propose to extend the unified Sink interface with the following changes. The changes to the unified sink should be backward compatible because only a new method with a default implementation is added to the sink interface.

...

A first example POC can be found here.

Limitations

Although the parallelism and the exchanges between the committable aggregator and the sink writer/committer are configurable new use cases for the unified sink might require more flexibility. In the future, we may want to add a new operator supporting a new use case. Over time this can lead to a fragmented interface with a lot of different options which may or may not be compatible.

...

By providing the full flexibility of building the topology which is executed as part of the sink it should also be easy to build a topology to support small file compaction.

Proposed Changes

The plan will be to expose the DataStream incoming to the sink directly by extending the Sink interface to create a custom topology. The default is still using the sink writer topology to not break existing sinks.

...

Code Block
languagejava
titleDataStreamSink
DataStreamSink.java
 protected DataStreamSink(DataStream<T> inputStream, TopologySink<T, ?, ?> sink) {
        Topology<T> topology = sink.createTopology();
        TopologyTranslator translator = TopologyTranslators.getTranslator(topology.getClass());

        try {
            DataStream<byte[]> committableStream = translator.translate(inputStream, topology);

            // ensures the properties are set to the first transformation...
            transformation = findFirstTransformation(inputStream, committableStream);

            SinkCommitterTransformation<?, ?> sinkCommitterTransformation =
                    new SinkCommitterTransformation<>(
                            committableStream.getTransformation(), sink, "committer");
            inputStream.getExecutionEnvironment().addOperator(sinkCommitterTransformation);
        } catch (Exception exception) {
            throw new RuntimeException(exception);
        }
    }
    
  private static class TopologyTranslators {
        private static final Map<Class<?>, TopologyTranslator> TRANSLATOR_MAP =
                new HashMap<Class<?>, TopologyTranslator>() {
                    {
                        this.put(WriterOnlyTopology.class, new WriterOnlyTopologyTranslator());
                        this.put(DataStreamTopology.class, new DataStreamTopologyTranslator());
                    }
                };

        public static TopologyTranslator getTranslator(Class<?> clazz) {
            return TRANSLATOR_MAP.get(clazz);
        }
    }

Limitations

This approach probably allows implementing all sink scenarios because of its high flexibility. Unfortunately, it also goes a step back considering the batch and stream unification. If users start implementing stateful topologies they need to be familiar with both concepts to fully support them.

Another problem is that currently, flink-core does not know the DataStream classes currently which are in flink-streaming-java  to circumvent this issue we could have the implementation in flink-streaming-java and only provide some base interface in flink-core. This has a downside in that it adds unnecessary complexity and might confuse users.

Rejected Alternatives

TBD


References

[1] https://github.com/apache/iceberg/pull/3213

...