Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

JIRA:

...

ASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19510

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer to build a simple sink topology pattern, which could satisfiesguarantee the HDFS/S3/Iceberg sink.exactly once
 * Thissemantics sink topology includes one {@link Writer} + onein both batch and stream execution mode if there is a {@link Committer} + oneor {@link GlobalCommitter}.
 * 1. The {@link WriterSinkWriter} is responsible for producing the committable.
 * 2. The {@link Committer} is responsible for committing a single committable.
 * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we calledcall the global committable.
 * And the parallelism ofcommittable. theThe {@link GlobalCommitter} is always executed with a parallelism of 1.
 * BothNote: theDevelopers {@link need to ensure the idempotence of {@link Committer} and the {@link GlobalCommitter} are optional..
 *
 * @param <InputT>        The type of the sink's input
 * @param <CommT>         The type of information theneeded to committablecommit data staged by the sink
 * @param <GlobalCommT><WriterStateT>  The type of the sink aggregatedwriter's committablestate
 * @param <GlobalCommT>  <WriterStateT> The type of the writer'saggregated statecommittable
 */
public interface Sink<InputT, CommT, GlobalCommTWriterStateT, GlobalCommT> WriterStateT>extends {Serializable {

	/**
	 * Create a {@link WriterSinkWriter}.
	 *
	 * @param context the runtime context.
	 * @param states the previous writerswriter's state.
	 *
	 * @return A sink writer.
	 *
	 * @throws IOException if fail to create a writer.
	 */
	Writer<InputTSinkWriter<InputT, CommT, WriterStateT> createWriter(
			InitContext context, 
			List<WriterStateT> states) throws IOException;

	/**
	 * @returnCreates a {@link Committer}.
	 */
	Optional<Committer<CommT>> createCommitter() * @return A committer.
	 *
	 * @throws IOException if fail to create a committer.
	 */
	Optional<Committer<CommT>> createCommitter() throws IOException;

	/**
	 * @returnCreates a {@link GlobalCommitter}.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter();

	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	interface InitContext {

		int getSubtaskId();

		MetricGroup metricGroup();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The interface is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
 * As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
 *
 * @param <InputT>       The type of the writer's input
 * @param <CommT>        The type of the committable data
 * @param <WriterStateT> The type of the writer's state
 */
interface Writer<InputT, CommT, WriterStateT> { * @return A global committer.
	 *
	 * @throws IOException if fail to create a global committer.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

	/**
	 * Returns the serializer of the committable type.
	 */
	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	/**
	 * Returns the serializer of the aggregated committable type.
	 */
	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	/**
	 * Return the serializer of the writer's state type.
	 */
	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	/**
	 * The interface exposes some runtime info for creating a {@link SinkWriter}.
	 */
	interface InitContext {

		/**
		 * Returns a {@link ProcessingTimeService} that can be used to
		 * get the current time and register timers.
		 */
		ProcessingTimeService getProcessingTimeService();

		/**
		 * @return The id of task where the writer is.
		 */
		int getSubtaskId();

		/**
		 * @return The metric group this writer belongs to.
		 */
		MetricGroup metricGroup();
	}

	/**
	 * AddA service anthat elementallows to get the writer.
	 * @param element The input recordcurrent processing time and register timers that
	 * @paramwill ctxexecute Thethe additionalgiven information{@link aboutProcessingTimeCallback} the input recordwhen firing.
	 */
	voidinterface write(InputT element, Context ctx);ProcessingTimeService {

		/**
	 *Returns Preparethe forcurrent aprocessing committime.
	 * @param flush  whether flushing the un-staged committable or not
	 */
	List<CommT> prepareCommit(boolean flush/
		long getCurrentProcessingTime();

		/**
		 * @returnInvokes the writer's state.
 given callback at the given timestamp.
		 */
	List<WriterStateT> snapshotState();

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}
		 * @param time Time when the callback is invoked at
		 * @param processingTimerCallback The callback to be invoked.
		 */
		void registerProcessingTimer(long time, ProcessingTimeCallback processingTimerCallback);

		/**
		 * A callback that can be registered via {@link #registerProcessingTimer(long,
		 * ProcessingTimeCallback)}.
		 */
		interface ProcessingTimeCallback {

			/**
			 * This method is invoked with the time which the callback register for.
			 *
			 * @param time The time this callback was registered for.
			 */
			void onProcessingTime(long time) throws IOException;
		}
	}
}



Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The {@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
 * data, e.g. in-progress files. The data (or metadata pointing to where the actual data is staged) ready to commit is
 * returned to the system by the {@link #prepareCommit(boolean)}.
 *
 * @param <InputT>         The type of the sink writer's input
 * @param <CommT>          The type of information needed to commit data staged by the sink
 * @param <WriterStateT>   The type of the writer's state
 */
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

	/**
	 * Add an element to the writer.
	 *
	 * @param element The input record
	 * @param context The additional information about the input record
	 *
	 * @throws IOException if fail to add an element.
	 */
	void write(InputT element, Context context) throws IOException;

	/**
	 * Prepare for a commit.
	 *
	 * <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
	 *
	 * @param flush Whether flushing the un-staged data or not
	 * @return The data is ready to commit.
	 *
	 * @throws IOException if fail to prepare for a commit.
	 */
	List<CommT> prepareCommit(boolean flush) throws IOException;

	/**
	 * @return The writer's state.
	 *
	 * @throws IOException if fail to snapshot writer's state.
	 */
	List<WriterStateT> snapshotState() throws IOException;


	/**
	 * Context that {@link #write} can use for getting additional data about an input record.
	 */
	interface Context {

		/**
		 * Returns the current event-time watermark.
		 */
		long currentWatermark();

		/**
		 * Returns the timestamp of the current input record or {@code null} if the element does not
		 * have an assigned timestamp.
		 */
		Long timestamp();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
 * The {@code Committer} is responsible for committing the data staged by the sink.
 *
 * @param <CommT> The type of information needed to commit the staged data
 */
public interface Committer<CommT> extends AutoCloseable {

	/**
	 * Commit the given list of {@link CommT}.
	 * @param committables A list of information needed to commit data staged by the sink.
	 * @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<CommT> commit(List<CommT> committables) throws IOException;
}



Code Block
languagejava
themeConfluence
titleGlobalCommitter
linenumberstrue
/**
 * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
 * which we call global committable (see {@link #combine}).
 *
 * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
 *
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {

	/**
	 * Find out which global committables need to be retried when recovering from the failure.
	 * @param globalCommittables A list of {@link GlobalCommT} for which we want to verify
	 *                              which ones were successfully committed and which ones did not.
	 *
	 * @return A list of {@link GlobalCommT} that should be committed again.
	 *
	 * @throws IOException if fail to filter the recovered committables.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;

	/**
	 * Compute an aggregated committable from a list of committables.
	 * @param committables A list of {@link CommT} to be combined into a {@link GlobalCommT}.
	 *
	 * @return an aggregated committable
	 *
	 * @throws IOException if fail to combine the given committables.
	 */
	GlobalCommT combine(List<CommT> committables) throws IOException;

	/**
	 * Commit the given list of {@link GlobalCommT}.
	 *
	 * @param globalCommittables a list of {@link GlobalCommT}.
	 *
	 * @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 *
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException
Code Block
languagejava
themeConfluence
titleGlobalCommitter
/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables.
 *
 * @param <CommT>   The type of the committable data
 * @param <GlobalCommT>  The type of the aggregated committable
 */
interface GlobalCommitter<CommT, GlobalCommT> {

	/**
	 * This method is called when restoring from a failover.
	 * @param globalCommittables the global committable that are not committed in the previous session.
	 * @return the global committables that should be committed again in the current session.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables);

	/**
	 * Compute an aggregated committable from a collection of committables.
	 * @param committables a collection of committables that are needed to combine
	 * @return an aggregated committable
	 */
	GlobalCommT combine(List<CommT> committables);

	CommitResult commit(GlobalCommT globalCommittable);

	/**
	 * Signals that Therethere is no committable any more.
	 *
	 * @throws IOException if fail to handle this notification.
	 */
	void endOfInput();

	enum CommitResult {
		SUCCESS, FAILURE, RETRY
	}throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

...