Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttps://

...

...

9jq2fbo3r9lrm9y7cvgyw4h64vqxvjy7
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25555

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

With FLIP-143 we introduced the unified Sink API to make it easier for connector developers to support batch and streaming scenarios. While developing the unified Sink API it was already noted that the unified Sink API might not be flexible enough to support all scenarios from the beginning.

...

We reduced the implementation overhead in Sink V2 and split the Sink V1 into three mixin interfaces providing different functionalities. Sink developers can not decide now decide based on their requirements which sink interfaces they need to combine to implement their sinks.

...

Code Block
languagejava
/**
 * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
 * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
 * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
 * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
 *
 * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
 * validated eagerly. The respective sink writers and committers are transient and will only be
 * created in the subtasks on the taskmanagers.
 *
 * @param <InputT> The type of the sink's input
 * @param <CommT> The type of the committables.
 */
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {

    /**
     * Create a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommT> createComm
ittercreateCommitter() throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommT> getCommittableSerializer();


    /** A {@link SinkWriter} that performs the first part of a 2pc protocol. */
	@PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
        /**
         * Prepare for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the 2pc protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
    }
}

...

We envision the implementation of certain connectors can be done with the following interfaces.

ConnectorSinkStatefulSinkTwoPhaseCommittingSinkWithPreWriteTopologyWithPreCommitTopologyWithPostCommitTopology
Kafka(tick)(tick)(tick)


File(tick)(tick)(tick)


Elasticsearch(tick)




Async(tick)(tick)



Hive(tick)(tick)(tick)((tick))(tick)
Iceberg(tick)(tick)(tick)(tick)
(tick)

Hive and Iceberg will leverage small file compaction. Though both connectors implement it differently. Hive will try to do a best-effort compaction before actually making the files visible in the metastore because it does not support updates. On the other hand, the Iceberg connector writes the files immediately and the post topology will take of compacting the already written files and updating the file log after the compaction.

...

As part of this FLIP, we will migrate all sinks that are currently part of the Flink main repository to the Sink V2 interfaces (i.e. Async, Kafka, Elasticsearch, File). We will not break the existing Sink V1 interfaces because they are already used within the community but we will make them @PublicEvolving  and immediately deprecate them. Our plan is that we keep the Sink V1 interfaces at least until the next major version (2.0) of Flink.

Rejected Alternatives

Alternative 1 Global Sink Coordinator:

The idea is similar to the SourceCoordinator which was implemented as part of FLIP-27 for the unified Source interface. On the contrary, to the unified source interface the Sink coordinator is optional and not all sinks must implement it.

The global sink coordinator solves the small-file-compaction problem by combining committables (files with sizing information) together until a certain threshold is reached. Afterward the combined committable is forwarded to the committers that do the actual file merging and write it to the final location.

Proposed Changes

Extend the unified Sink interface by allowing to create sink coordinator similar to the `SourceEnumerator` [4]. The creation is optional so the forwarding of the committable to the Job Manager will also only happen if the connector specifies one. In general, the behaviour of sending the committables does not change (sending them on preSnapshot) and the sink coordinator needs to write them into the checkpoint.

Limitations

Unfortunately, checkpoint for the sink coordinator is not easily possible because the coordinators are checkpointed before the operators in the pipeline. In the case of the sources, this works well because the source coordinator is basically the first operator of the pipeline. For sinks, it means that the sink coordinator would be snapshotting its state before the sink writers leading to potential data loss. Making the sink coordinator stateful would require first changes to the coordinator framework around how they are doing checkpoints.

Another limitation is that committables to the sink coordinators are sent via RPC calls which restrict the payload size. If the committables contain data this will probably not be possible anymore with this approach.

Alternative 2 Committable Aggregator Operator:

This approach is comparable to the current implementation in Flink’s Table API to solve the small-file-compaction problem [5] but we make it more extensible.

The committable aggregators are optional operators between the sink writers and committers. The user can define the key for the exchange to the comittable aggregator, the parallelism of the committable aggregator, and the key for exchange from the committable aggregators to the committers.


Proposed Changes

We propose to extend the unified Sink interface with the following changes. The changes to the unified sink should be backward compatible because only a new method with a default implementation is added to the sink interface.

...

A first example POC can be found here.

Limitations

Although the parallelism and the exchanges between the committable aggregator and the sink writer/committer are configurable new use cases for the unified sink might require more flexibility. In the future, we may want to add a new operator supporting a new use case. Over time this can lead to a fragmented interface with a lot of different options which may or may not be compatible.

Alternative 3 Custom sink topology:

In general, the most important part of a sink is how the commit mechanism works because usually data which is already committed is hard to revert and may lead to problems on the consumer side. The custom sink topology isolated the committer and global committer as the only components which the Sink offers and the former sink writers are only syntactic sugar. Connector developers can implement their own custom topologies which generate committables and are passed to the committers.

...

By providing the full flexibility of building the topology which is executed as part of the sink it should also be easy to build a topology to support small file compaction.

Proposed Changes

The plan will be to expose the DataStream incoming to the sink directly by extending the Sink interface to create a custom topology. The default is still using the sink writer topology to not break existing sinks.

...

Code Block
languagejava
titleDataStreamSink
DataStreamSink.java
 protected DataStreamSink(DataStream<T> inputStream, TopologySink<T, ?, ?> sink) {
        Topology<T> topology = sink.createTopology();
        TopologyTranslator translator = TopologyTranslators.getTranslator(topology.getClass());

        try {
            DataStream<byte[]> committableStream = translator.translate(inputStream, topology);

            // ensures the properties are set to the first transformation...
            transformation = findFirstTransformation(inputStream, committableStream);

            SinkCommitterTransformation<?, ?> sinkCommitterTransformation =
                    new SinkCommitterTransformation<>(
                            committableStream.getTransformation(), sink, "committer");
            inputStream.getExecutionEnvironment().addOperator(sinkCommitterTransformation);
        } catch (Exception exception) {
            throw new RuntimeException(exception);
        }
    }
    
  private static class TopologyTranslators {
        private static final Map<Class<?>, TopologyTranslator> TRANSLATOR_MAP =
                new HashMap<Class<?>, TopologyTranslator>() {
                    {
                        this.put(WriterOnlyTopology.class, new WriterOnlyTopologyTranslator());
                        this.put(DataStreamTopology.class, new DataStreamTopologyTranslator());
                    }
                };

        public static TopologyTranslator getTranslator(Class<?> clazz) {
            return TRANSLATOR_MAP.get(clazz);
        }
    }

Limitations

This approach probably allows implementing all sink scenarios because of its high flexibility. Unfortunately, it also goes a step back considering the batch and stream unification. If users start implementing stateful topologies they need to be familiar with both concepts to fully support them.

Another problem is that currently, flink-core does not know the DataStream classes currently which are in flink-streaming-java  to circumvent this issue we could have the implementation in flink-streaming-java and only provide some base interface in flink-core. This has a downside in that it adds unnecessary complexity and might confuse users.

...

[1] https://github.com/apache/iceberg/pull/3213

...