Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19510

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The document includes three parts: The first part describes the semantics the unified sink API should support. According to the first part the second part proposes a transactional new unified sink API. In the last part we introduce two open questions related to the API.

...

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

...

Sink API


Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer to build a simple transactional sink topology pattern, which satisfiescould guarantee the HDFS/S3/Iceberg sink.exactly once
 * This sink topology includes one {@link Writer} + onesemantics in both batch and stream execution mode if there is a {@link Committer} + oneor {@link GlobalCommitter}.
 * 1. The {@link WriterSinkWriter} is responsible for producing the committable.
 * 2. The {@link Committer} is responsible for committing a single committable.
 * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we calledcall the global committable.
 * And the parallelism ofcommittable. theThe {@link GlobalCommitter} is always executed with a parallelism of 1.
 * BothNote: Developers need to ensure the idempotence of {@link Committer} and the {@link GlobalCommitter} are optional..
 *
 * @param <InputT>        The type of the sink's input
 * @param <CommT>         The type of theinformation committableneeded data
to *commit @paramdata <GlobalCommT>staged  The type of by the aggregated committablesink
 * @param <WriterStateT>  The type of the sink writer's state
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface TransactionalSink<InputTSink<InputT, CommT, GlobalCommTWriterStateT, GlobalCommT> extends WriterStateT>Serializable {

	/**
	 * Create a {@link WriterSinkWriter}.
	 *
	 * @param context the runtime context.
	 * @param states the previous writerswriter's state.
	 *
	 * @return A sink writer.
	 */
	Writer<InputT, CommT,* WriterStateT>@throws createWriter(InitContext context, List<WriterStateT> states);

	/**
	 * @return a {@link Committer}
	 */
	Optional<Committer<CommT>> createCommitter()IOException if fail to create a writer.
	 */
	SinkWriter<InputT, CommT, WriterStateT> createWriter(
			InitContext context,
			List<WriterStateT> states) throws IOException;

	/**
	 * @returnCreates a {@link GlobalCommitterCommitter}.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter();

	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	interface InitContext {

		int getSubtaskId();

		int getAttemptID();

		MetricGroup metricGroup();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The interface is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
 * As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
 *
 * @param <InputT>       The type of the writer's input
 * @param <CommT>        The type of the committable data
 * @param <WriterStateT> The type of the writer's state
 */
interface Writer<InputT, CommT, WriterStateT> { * @return A committer.
	 *
	 * @throws IOException if fail to create a committer.
	 */
	Optional<Committer<CommT>> createCommitter() throws IOException;

	/**
	 * Creates a {@link GlobalCommitter}.
	 *
	 * @return A global committer.
	 *
	 * @throws IOException if fail to create a global committer.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

	/**
	 * Returns the serializer of the committable type.
	 */
	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	/**
	 * Returns the serializer of the aggregated committable type.
	 */
	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	/**
	 * Return the serializer of the writer's state type.
	 */
	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	/**
	 * The interface exposes some runtime info for creating a {@link SinkWriter}.
	 */
	interface InitContext {

		/**
		 * Returns a {@link ProcessingTimeService} that can be used to
		 * get the current time and register timers.
		 */
		ProcessingTimeService getProcessingTimeService();

		/**
		 * @return The id of task where the writer is.
		 */
		int getSubtaskId();

		/**
		 * @return The metric group this writer belongs to.
		 */
		MetricGroup metricGroup();
	}

	/**
	 * AddA service anthat elementallows to get the writer.
	 * @param element The input recordcurrent processing time and register timers that
	 * @paramwill ctxexecute Thethe additionalgiven information{@link aboutProcessingTimeCallback} the input recordwhen firing.
	 */
	interface @param output The committable data could be shipped to the committer by this
	ProcessingTimeService {

		/** Returns the current processing time. */
		voidlong write(InputT element, Context ctx, WriterOutput<CommT> outputgetCurrentProcessingTime();

		/**
		 * Prepare for a commit.
	 * @param flush  whether flushing the un-staged committable or not
	 Invokes the given callback at the given timestamp.
		 *
		 * @param time Time when the callback is invoked at
		 * @param outputprocessingTimerCallback The committablecallback data couldto be shipped to the committer by thisinvoked.
		 */
		void prepareCommitregisterProcessingTimer(booleanlong flushtime, WriterOutput<CommT>ProcessingTimeCallback outputprocessingTimerCallback);

		/**
		 * A @returncallback thethat writer's statecan be registered via {@link #registerProcessingTimer(long,
		 * ProcessingTimeCallback)}.
		 */
	List<WriterStateT> snapshotState();

	interface ContextProcessingTimeCallback {

			long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}
/**
			 * This method is invoked with the time which the callback register for.
			 *
			 * @param time The time this callback was registered for.
			 */
			void onProcessingTime(long time) throws IOException;
		}
	}
}



Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The {@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
 * data, e.g. in-progress files. The data (or metadata pointing to where the actual data is staged) ready to commit is
 * returned to the system by the {@link #prepareCommit(boolean)}.
 *
 * @param <InputT>         The type of the sink writer's input
 * @param <CommT>          The type of information needed to commit data staged by the sink
 * @param <WriterStateT>   The type of the writer's state
 */
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

	/**
	 * Add an element to the writer.
	 *
	 * @param element The input record
	 * @param context The additional information about the input record
	 *
	 * @throws IOException if fail to add an element.
	 */
	void write(InputT element, Context context) throws IOException;

	/**
	 * Prepare for a commit.
	 *
	 * <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
	 *
	 * @param flush Whether flushing the un-staged data or not
	 * @return The data is ready to commit.
	 *
	 * @throws IOException if fail to prepare for a commit.
	 */
	List<CommT> prepareCommit(boolean flush) throws IOException;

	/**
	 * @return The writer's state.
	 *
	 * @throws IOException if fail to snapshot writer's state.
	 */
	List<WriterStateT> snapshotState() throws IOException;


	/**
	 * Context that {@link #write} can use for getting additional data about an input record.
	 */
	interface Context {

		/**
		 * Returns the current event-time watermark.
		 */
		long currentWatermark();

		/**
		 * Returns the timestamp of the current input record or {@code null} if the element does not
		 * have an assigned timestamp.
		 */
		Long timestamp();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
 * The {@code Committer} is responsible for committing the data staged by the sink.
 *
 * @param <CommT> The type of information needed to commit the staged data
 */
public interface Committer<CommT> extends AutoCloseable {

	/**
	 * Commit the given list of {@link CommT}.
	 * @param committables A list of information needed to commit data staged by the sink.
	 * @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<CommT> commit(List<CommT> committables) throws IOException;
}



Code Block
languagejava
themeConfluence
titleGlobalCommitter
linenumberstrue
/**
 * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
 * which we call global committable (see {@link #combine}).
 *
 * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
 *
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {

	/**
	 * Find out which global committables need to be retried when recovering from the failure.
	 * @param globalCommittables A list of {@link GlobalCommT} for which we want to verify
	 *                              which ones were successfully committed and which ones did not.
	 *
	 * @return A list of {@link GlobalCommT} that should be committed again.
	 *
	 * @throws IOException if fail to filter the recovered committables
Code Block
languagejava
themeConfluence
titleGlobalCommitter
/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables.
 *
 * @param <CommT>   The type of the committable data
 * @param <GlobalCommT>  The type of the aggregated committable
 */
interface GlobalCommitter<CommT, GlobalCommT> {

	/**
	 * This method is called when restoring from a failover.
	 * @param globalCommittables the global committable that are not committed in the previous session.
	 * @return the global committables that should be committed again in the current session.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;

	/**
	 * Compute an aggregated committable from a collectionlist of committables.
	 * @param committables A list of {@link CommT} to be combined into a collection of committables that are needed {@link GlobalCommT}.
	 *
	 * @return an aggregated committable
	 *
	 * @throws IOException if fail to combine
	 *the @return an aggregated committablegiven committables.
	 */
	GlobalCommT combine(List<CommT> committables);

	CommitResult commit(GlobalCommT globalCommittable) throws IOException;

	/**
	 * ThereCommit the isgiven nolist committableof any{@link moreGlobalCommT}.
	 */
	void endOfInput();

	enum CommitResult {
		SUCCESS, FAILURE, RETRY
	}
}

Code Block
languagejava
themeConfluence
firstline0
titleWriterOutput
linenumberstrue
/**
* The {@link Writer} uses this interface to send the committable data to the operator who knows when to commit to the external system.
*
* @param <CommT> The type of the committable data.
*/
public interface WriterOutput<CommT> { * @param globalCommittables a list of {@link GlobalCommT}.
	 *
	 * @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 *
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException;

	/**
	 * SendSignals thethat committablethere datais 
tono thecommittable operator who knows when to commitany more.
	 *
	 * @param@throws committableIOException Theif datafail thatto ishandle readythis for committingnotification.
	 */
	void sendToCommitendOfInput(CommT committable) throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

...