Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19510

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer build a simple sink topology, which could guarantee the exactly once
 * semantics in both batch and stream execution mode if there is a {@link Committer} or {@link GlobalCommitter}.
 * 1. The {@link SinkWriter} is responsible for producing the committable.
 * 2. The {@link Committer} is responsible for committing a single committable.
 * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we call the global
 *    committable. The {@link GlobalCommitter} is always executed with a parallelism of 1.
 * Note: Developers need to ensure the idempotence of {@link Committer} and {@link GlobalCommitter}.
 *
 * @param <InputT>        The type of the sink's input
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <WriterStateT>  The type of the sink writer's state
 * @param <GlobalCommT>   The type of the aggregated committable
 */
@Experimental
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable {

	/**
	 * Create a {@link SinkWriter}.
	 *
	 * @param context the runtime context.
	 * @param states the writer's state.
	 *
	 * @return A sink writer.
	 *
	 * @throws IOException if fail to create a writer.
	 */
	SinkWriter<InputT, CommT, WriterStateT> createWriter(
			InitContext context,
			List<WriterStateT> states) throws IOException;

	/**
	 * Creates a {@link Committer}.
	 *
	 * @return A committer.
	 *
	 * @throws IOException if fail to create a committer.
	 */
	Optional<Committer<CommT>> createCommitter() throws IOException;

	/**
	 * Creates a {@link GlobalCommitter}.
	 *
	 * @return A global committer.
	 *
	 * @throws IOException if fail to create a global committer.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

	/**
	 * Returns the serializer of the committable type.
	 */
	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	/**
	 * Returns the serializer of the aggregated committable type.
	 */
	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	/**
	 * Return the serializer of the writer's state type.
	 */
	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	/**
	 * The interface exposes some runtime info for creating a {@link SinkWriter}.
	 */
	interface InitContext {

		/**
		 * Returns a {@link ProcessingTimeService} that can be used to
		 * get the current time and register timers.
		 */
		ProcessingTimeService getProcessingTimeService();

		/**
		 * @return The id of task where the writer is.
		 */
		int getSubtaskId();

		/**
		 * @return The metric group this writer belongs to.
		 */
		MetricGroup metricGroup();
	}

	/**
	 * A service that allows to get the current processing time and register timers that
	 * will execute the given {@link ProcessingTimeCallback} when firing.
	 */
	interface ProcessingTimeService {

		/** Returns the current processing time. */
		long getCurrentProcessingTime();

		/**
		 * Invokes the given callback at the given timestamp.
		 *
		 * @param time Time when the callback is invoked at
		 * @param processingTimerCallback The callback to be invoked.
		 */
		void registerProcessingTimer(long time, ProcessingTimeCallback processingTimerCallback);

		/**
		 * A callback that can be registered via {@link #registerProcessingTimer(long,
		 * ProcessingTimeCallback)}.
		 */
		interface ProcessingTimeCallback {

			/**
			 * This method is invoked with the time which the callback register for.
			 *
			 * @param time The time this callback was registered for.
			 */
			void onProcessingTime(long time) throws IOException;
		}
	}
}

...

Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The interface{@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
 * data, e.g. in-progress files.
 * As soon as some data is ready to commit, they The data (or metadata pointing to where the actual data is staged) areready shipped to ancommit operatoris
 who* knows whenreturned to commit themthe system by the {@link #prepareCommit(boolean)}.
 *
 * @param <InputT>         The type of the sink writer's input
 * @param <CommT>          The type of the committable datainformation needed to commit data staged by the sink
 * @param <WriterStateT>   The type of the writer's state
 */
public interface Writer<InputTSinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

	/**
	 * Add an element to the writer.
	 *
	 * @param element The input record
	 * @param ctxcontext The additional information about the input record
	 */
	void write(InputT element, Context ctx); * @throws IOException if fail to add an element.
	 */
	void write(InputT element, Context context) throws IOException;

	/**
	 * Prepare for a commit.
	 * @param flush  whether flushing the un-staged committable or not
	 * <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
	 */
	List<CommT> prepareCommit(boolean flush);

	/* * @param flush Whether flushing the un-staged data or not
	 * @return The data is ready to commit.
	 *
	 * @return the writer's state @throws IOException if fail to prepare for a commit.
	 */
	List<WriterStateT>List<CommT> snapshotStateprepareCommit(boolean flush) throws IOException;

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}
/**
	 * @return The writer's state.
	 *
	 * @throws IOException if fail to snapshot writer's state.
	 */
	List<WriterStateT> snapshotState() throws IOException;


	/**
	 * Context that {@link #write} can use for getting additional data about an input record.
	 */
	interface Context {

		/**
		 * Returns the current event-time watermark.
		 */
		long currentWatermark();

		/**
		 * Returns the timestamp of the current input record or {@code null} if the element does not
		 * have an assigned timestamp.
		 */
		Long timestamp();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
 * The {@code Committer} is responsible for committing the data staged by the sink.
 *
 * @param <CommT> The type of information needed to commit the staged data
 */
public interface Committer<CommT> extends AutoCloseable {

	/**
	 * Commit the given list of {@link CommT}.
	 * @param committables A list of information needed to commit data staged by the sink.
	 * @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<CommT> commit(List<CommT> committables) throws IOException;
}



Code Block
languagejava
themeConfluence
titleGlobalCommitter
linenumberstrue
/**
 * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
 * which we call global committable (see {@link #combine}).
 *
 * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
 *
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {

	/**
	 * Find out which global committables need to be retried when recovering from the failure.
	 * @param globalCommittables A list of {@link GlobalCommT} for which we want to verify
	 *                              which ones were successfully committed and which ones did not.
	 *
	 * @return A list of {@link GlobalCommT} that should be committed again.
	 *
	 * @throws IOException if fail to filter the recovered committables.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;
Code Block
languagejava
themeConfluence
titleGlobalCommitter
/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables.
 *
 * @param <CommT>   The type of the committable data
 * @param <GlobalCommT>  The type of the aggregated committable
 */
interface GlobalCommitter<CommT, GlobalCommT> {

	/**
	 * This method is called when restoring from a failover Compute an aggregated committable from a list of committables.
	 * @param committables A list of {@link CommT} to be combined into a {@link GlobalCommT}.
	 *
	 @param* globalCommittables@return thean globalaggregated committable that are not committed in
	 *
	 * @throws IOException if fail to combine the previousgiven sessioncommittables.
	 * @return the global committables that should be committed again in the current session/
	GlobalCommT combine(List<CommT> committables) throws IOException;

	/**
	 * Commit the given list of {@link GlobalCommT}.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables);

	/* * @param globalCommittables a list of {@link GlobalCommT}.
	 *
	 * Compute an aggregated committable from a collection of committables @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 *
	 @param* committables@throws aIOException collectionif ofthe committablescommit thatoperation arefail neededand todo combine
	not *want @returnto anretry aggregated committable
	 */
	GlobalCommT combine(List<CommT> committables);

	CommitResultany more.
	 */
	List<GlobalCommT> commit(GlobalCommTList<GlobalCommT> globalCommittableglobalCommittables) throws IOException;

	/**
	 * Signals Therethat there is no committable any more.
	 *
	 * @throws IOException if fail to handle this notification.
	 */
	void endOfInput();

	enum CommitResult {
		SUCCESS, FAILURE, RETRY
	}throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

...