Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Vote threadhttps://

...

...

9jq2fbo3r9lrm9y7cvgyw4h64vqxvjy7
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25555

Release1.15


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We reduced the implementation overhead in Sink V2 and split the Sink V1 into three mixin interfaces providing different functionalities. Sink developers can not decide now decide based on their requirements which sink interfaces they need to combine to implement their sinks.

...

Code Block
languagejava
/**
 * A {@link Sink} for exactly-once semantics using a two-phase commit protocol. The {@link Sink}
 * consists of a {@link SinkWriter} that performs the precommits and a {@link Committer} that
 * actually commits the data. To facilitate the separation the {@link SinkWriter} creates
 * <i>committables</i> on checkpoint or end of input and the sends it to the {@link Committer}.
 *
 * <p>The {@link TwoPhaseCommittingSink} needs to be serializable. All configuration should be
 * validated eagerly. The respective sink writers and committers are transient and will only be
 * created in the subtasks on the taskmanagers.
 *
 * @param <InputT> The type of the sink's input
 * @param <CommT> The type of the committables.
 */
@PublicEvolving
public interface TwoPhaseCommittingSink<InputT, CommT> extends Sink<InputT> {

    /**
     * Create a {@link PrecommittingSinkWriter} that creates committables on checkpoint or end of
     * input.
     *
     * @param context the runtime context.
     * @return A sink writer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    PrecommittingSinkWriter<InputT, CommT> createWriter(InitContext context) throws IOException;

    /**
     * Creates a {@link Committer} that permanently makes the previously written data visible
     * through {@link Committer#commit(Collection)}.
     *
     * @return A committer for the two-phase commit protocol.
     * @throws IOException for any failure during creation.
     */
    Committer<CommT> createComm
ittercreateCommitter() throws IOException;

    /** Returns the serializer of the committable type. */
    SimpleVersionedSerializer<CommT> getCommittableSerializer();


    /** A {@link SinkWriter} that performs the first part of a 2pc protocol. */
	@PublicEvolving
    interface PrecommittingSinkWriter<InputT, CommT> extends SinkWriter<InputT> {
        /**
         * Prepare for a commit.
         *
         * <p>This method will be called after {@link #flush(boolean)} and before {@link
         * StatefulSinkWriter#snapshotState(long)}.
         *
         * @return The data to commit as the second step of the 2pc protocol.
         * @throws IOException if fail to prepare for a commit.
         */
        Collection<CommT> prepareCommit() throws IOException, InterruptedException;
    }
}

...

We envision the implementation of certain connectors can be done with the following interfaces.

ConnectorSinkStatefulSinkTwoPhaseCommittingSinkWithPreWriteTopologyWithPreCommitTopologyWithPostCommitTopology
Kafka(tick)(tick)(tick)


File(tick)(tick)(tick)


Elasticsearch(tick)




Async(tick)(tick)



Hive(tick)(tick)(tick)((tick))(tick)
Iceberg(tick)(tick)(tick)(tick)
(tick)

Hive and Iceberg will leverage small file compaction. Though both connectors implement it differently. Hive will try to do a best-effort compaction before actually making the files visible in the metastore because it does not support updates. On the other hand, the Iceberg connector writes the files immediately and the post topology will take of compacting the already written files and updating the file log after the compaction.

...