Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19510

...

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
 * This interface knows how to commitThe {@code Committer} is responsible for committing the data tostaged by the external systemsink.
 *
 
* @param <CommT> The type of information needed to commit the committablestaged data.
 */
public interface Committer<CommT> extends AutoCloseable {

	void commit(CommT committable) throws Exception;
}
Code Block
languagejava
themeConfluence
titleGlobalCommitter
/**
 * The/**
	 * Commit the given list of {@link GlobalCommitterCommT}.
	 is* responsible@param forcommittables committingA anlist aggregatedof committable,information whichneeded weto calledcommit globaldata staged by the sink.
	 * @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<CommT> commit(List<CommT> committables) throws IOException;
}



Code Block
languagejava
themeConfluence
titleGlobalCommitter
linenumberstrue
/**
 * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
 * which we call global committable (see {@link #combine}).
 *
 * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
 *
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {

	/**
	 * Find out which global committables need to be retried when recovering from the failure.
	 * @param globalCommittables A list of {@link GlobalCommT} for which we want to verify
	 *                              which ones were successfully committed and which ones did not.
	 *
	 * @return A list of {@link GlobalCommT} that should be committed again.
	 *
	 * @throws IOException if fail to filter the recovered committables.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;

	/**
	 * Compute an aggregated committable from a list of committables.
	 * @param committables A list of {@link CommT} to be combined into a {@link GlobalCommT}.
	 *
	 * @return an aggregated committable
	 *
	 * @throws IOException if fail to combine the given committables.
	 */
	GlobalCommT combine(List<CommT> committables) throws IOException;

	/**
	 * Commit the given list of {@link GlobalCommT}.
	 *
	 * @param globalCommittables a list of {@link GlobalCommT}.
	 *
	 * @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 *
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOExceptioncommittables.
 *
 * @param <CommT>   The type of the committable data
 * @param <GlobalCommT>  The type of the aggregated committable
 */
interface GlobalCommitter<CommT, GlobalCommT> {

	/**
	 * This method is called when restoring from a failover.
	 * @param globalCommittables the global committable that are not committed in the previous session.
	 * @return the global committables that should be committed again in the current session.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables);

	/**
	 * Compute an aggregated committable from a collection of committables.
	 * @param committables a collection of committables that are needed to combine
	 * @return an aggregated committable
	 */
	GlobalCommT combine(List<CommT> committables);

	CommitResult commit(GlobalCommT globalCommittable);

	/**
	 * Signals Therethat there is no committable any more.
	 *
	 * @throws IOException if fail to handle this notification.
	 */
	void endOfInput();

	enum CommitResult {
		SUCCESS, FAILURE, RETRY
	}throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

...