Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-19510

Release1.12


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We hope these things(SDK/Execution mode) are transparent to the sink API.

The document includes three parts: The first part describes the semantics the unified sink API should support. According to the first part the second part proposes two versions of a new unified sink API. In the last part we introduce two open questions related to the API.

Semantics

This section would talk about which semantics the unified sink API would support. This section includes two parts. The first part describes how the sink API supports the exactly once semantics. The second part mainly talks about that drain and snapshot are two general semantics, which are valid for both bounded and unbounded scenarios. 

Consistency

This section tries to figure out the sink developer would be responsible for which part and Flink would be responsible for which part in terms of ensuring consistency. After this we could know what semantics the sink API should have to support the exact once semantics.

...

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

Sink API

From above analysis we could know that there are four semantics the sink API should support.

  1. What to commit
  2. How to commit
  3. SnapshotState
  4. Drain(Flush)

Following we propose two versions of the sink APIs. There is no difference on how to express the four semantics between the two versions: 

  1. `Sink` is responsible for creating the `Writer` and `Committer`
  2. The `Writer` is responsible for writing data and handling any potential tmp area used to write yet unstaged data, e.g. in-progress files. As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
  3. The `Writer` also responds to the progress notification: Snapshot and Flush.
  4. The `Committer` knows how to commit the data to the destination storage system.

The only difference between the two versions is how to expose the state to the user. The cons & pros are following. 

First Alternative

  1. PROS
    1. This alternative has the advantage of not exposing anything about internal state handling to the user. We require the sink to provide us with the state to be persisted and it is up to the underlying runtime to handle them. For example, instead of using union state for the shared state, we could leverage a potential operator coordinator.
    2. We do not expose complex classes such as StateDescriptors, TypeSerializer... (only a simplified version: SimpleVersionedSerializer)
    3. The intent of the #snapshotState is cleaner. The signature tells the user already that it expects/allows some state out of this method.
  2. CONS
    1. It exposes more methods
    2. We get the state to snapshot via a return value from a method, the state must fit into memory. Moreover we must put the whole state into the state backend on each checkpoint instead of putting only the updated values during elements processing
Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
* The sink interface acts like a factory to create the {@link Writer} and {@link Committer}.
*
* @param <IN>      The type of the sink's input
* @param <CommT>   The type of the committable data
* @param <WriterS> The type of the writer's state
* @param <SharedS> The type of the writer's shared state
*/
public interface Sink<IN, CommT, WriterS, SharedS> extends Serializable {

	/**
	* Create a {@link Writer}.
	* @param context the runtime context
	* @return A new sink writer.
	*/
	Writer<IN, CommT, WriterS, SharedS> createWriter(InitContext context);

	/**
	* Restore a {@link Writer} from the state.
	* @param context the runtime context
	* @param state the previous writers' state
	* @param share the previous writers' shared state
	* @return A sink writer
	*/
	Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context, List<WriterS> state, List<SharedS> share);

	/**
	* Create a {@link Committer}.
	* @return A new sink committer
	*/
	Committer<CommT> createCommitter();

	/**
	* @return a committable serializer
	*/
	SimpleVersionedSerializer<CommT> getCommittableSerializer();

	/**
	* @return a writer's state serializer
	*/
	SimpleVersionedSerializer<WriterS> getStateSerializer();

	/**
	* @return a writer's shared state serializer
	*/
	SimpleVersionedSerializer<SharedS> getSharedStateSerializer();

	/**
	* Providing the runtime information.
	*/
	interface InitContext {

		int getSubtaskId();

		int getAttemptID();

		MetricGroup metricGroup();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer build a simple sink topology, which could guarantee the exactly once
 * semantics in both batch and stream execution mode if there is a {@link Committer} or {@link GlobalCommitter}.
 * 1. The {@link SinkWriter} is responsible for producing the committable.
 * 2. The {@link Committer} is responsible for committing a single committable.
 * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we call the global
 *    committable. The {@link GlobalCommitter} is always executed with a parallelism of 1.
 * Note: Developers need to ensure the idempotence of {@link Committer} and {@link GlobalCommitter}.
 *
 * @param <InputT>        The type of the sink's input
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <WriterStateT>  The type of the sink writer's state
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable {

	/**
	 * Create a {@link SinkWriter}.
	 *
	 * @param context the runtime context.
	 * @param states the writer's state.
	 *
	 * @return A sink writer.
	 *
	 * @throws IOException if fail to create a writer.
	 */
	SinkWriter<InputT, CommT, WriterStateT> createWriter(
			InitContext context,
			List<WriterStateT> states) throws IOException;

	/**
	 * Creates a {@link Committer}.
	 *
	 * @return A committer.
	 *
	 * @throws IOException if fail to create a committer.
	 */
	Optional<Committer<CommT>> createCommitter() throws IOException;

	/**
	 * Creates a {@link GlobalCommitter}.
	 *
	 * @return A global committer.
	 *
	 * @throws IOException if fail to create a global committer.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

	/**
	 * Returns the serializer of the committable type.
	 */
	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	/**
	 * Returns the serializer of the aggregated committable type.
	 */
	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	/**
	 * Return the serializer of the writer's state type.
	 */
	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	/**
	 * The interface exposes some runtime info for creating a {@link SinkWriter}.
	 */
	interface InitContext {

		/**
		 * Returns a {@link ProcessingTimeService} that can be used to
		 * get the current time and register timers.
		 */
		ProcessingTimeService getProcessingTimeService();

		/**
		 * @return The id of task where the writer is.
		 */
		int getSubtaskId();

		/**
		 * @return The metric group this writer belongs to.
		 */
		MetricGroup metricGroup();
	}

	/**
	 * A service that allows to get the current processing time and register timers that
	 * will execute the given {@link ProcessingTimeCallback} when firing.
	 */
	interface ProcessingTimeService {

		/** Returns the current processing time. */
		long getCurrentProcessingTime();

		/**
		 * Invokes the given callback at the given timestamp.
		 *
		 * @param time Time when the callback is invoked at
		 * @param processingTimerCallback The callback to be invoked.
		 */
		void registerProcessingTimer(long time, ProcessingTimeCallback processingTimerCallback);

		/**
		 * A callback that can be registered via {@link #registerProcessingTimer(long,
		 * ProcessingTimeCallback)}.
		 */
		interface ProcessingTimeCallback {

			/**
			 * This method is invoked with the time which the callback register for.
			 *
			 * @param time The time this callback was registered for.
			 */
			void onProcessingTime(long time) throws IOException;
		}
	}
}



Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The {@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
 * data, e.g. in-progress files. The data (or metadata pointing to where the actual data is staged) ready to commit is
 * returned to the system by the {@link #prepareCommit(boolean)}.
 *
 * @param <InputT>         The type of the sink writer's input
 * @param <CommT>          The type of information needed to commit data staged by the sink
 * @param <WriterStateT>   The type of the writer's state
 */
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

	/**
	 * Add an element to the writer.
	 *
	 * @param element The input record
	 * @param context The additional information about the input record
	 *
	 * @throws IOException if fail to add an element.
	 */
	void write(InputT element, Context context) throws IOException;

	/**
	 * Prepare for a commit.
	 *
	 * <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
	 *
	 * @param flush Whether flushing the un-staged data or not
	 * @return The data is ready to commit.
	 *
	 * @throws IOException if fail to prepare for a commit.
	 */
	List<CommT> prepareCommit(boolean flush) throws IOException;

	/**
	 * @return The writer's state.
	 *
	 * @throws IOException if fail to snapshot writer's state.
	 */
	List<WriterStateT> snapshotState() throws IOException;


	/**
	 * Context that {@link #write} can use for getting additional data about an input record.
	 */
	interface Context {

		/**
		 * Returns the current event-time watermark.
		 */
		long currentWatermark();

		/**
		 * Returns the timestamp of the current input record or {@code null} if the element does not
		 * have an assigned timestamp.
		 */
		Long timestamp();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
 * The {@code Committer} is responsible for committing the data staged by the sink.
 *
 * @param <CommT> The type of information needed to commit the staged data
 */
public interface Committer<CommT> extends AutoCloseable {

	/**
	 * Commit the given list of {@link CommT}.
	 * @param committables A list of information needed to commit data staged by the sink.
	 * @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<CommT> commit(List<CommT> committables) throws IOException;
}



Code Block
languagejava
themeConfluence
titleGlobalCommitter
linenumberstrue
/**
 * The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
 * which we call global committable (see {@link #combine}).
 *
 * <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
 *
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <GlobalCommT>   The type of the aggregated committable
 */
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable
Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
* The Writer is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files. 
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <IN> 			The type of the writer's input
* @param <CommT> 		The type of the committable data
* @param <StateT> 		The type of the writer's state
* @param <SharedStateT> The type of the writer's shared state
*/
public interface Writer<IN, CommT, StateT, SharedStateT> {

	/**
	 * Add an element to Find out which global committables need to be retried when recovering from the writerfailure.
	 * @param elementglobalCommittables TheA input record.
	* @param ctx     The additional information about the input record
	* @param output  The committable data could be shipped to the committer by this.
	* @throws Exception
	*/
	void write(IN element, Context ctx, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the writer's state.
	* @param output  The committable data could be shipped to the committer by this.
	* @return The writer's state.
	*/
	List<StateT> snapshotState(WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the writer's shared state.
	* @return The writer's shared state.
	*/
	SharedStateT snapshotSharedState() throws Exceptionlist of {@link GlobalCommT} for which we want to verify
	 *                              which ones were successfully committed and which ones did not.
	 *
	 * @return A list of {@link GlobalCommT} that should be committed again.
	 *
	 * @throws IOException if fail to filter the recovered committables.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;

	/**
	 * FlushCompute allan theaggregated datacommittable whichfrom isa un-stagedlist toof the committercommittables.
	 * @param output
	*/
	void flush(WriterOutput<CommT> output) throws Exception;

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}
Code Block
languagejava
themeConfluence
firstline0
titleWriterOutput
linenumberstrue
/**
* The {@link Writer} uses this interface to send the committable data to the operator who knows when to commit to the external system.
*
* @param <CommT> The type of the committable data.
*/
public interface WriterOutput<CommT> { committables A list of {@link CommT} to be combined into a {@link GlobalCommT}.
	 *
	 * @return an aggregated committable
	 *
	 * @throws IOException if fail to combine the given committables.
	 */
	GlobalCommT combine(List<CommT> committables) throws IOException;

	/**
	 * Commit the given list of {@link GlobalCommT}.
	 *
	 * @param globalCommittables a list of {@link GlobalCommT}.
	 *
	 * @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
	 *
	 * @throws IOException if the commit operation fail and do not want to retry any more.
	 */
	List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException;

	/**
	 * SendSignals thethat committablethere datais tono thecommittable operator who knows when to commitany more.
	 *
	 * @param@throws committableIOException Theif datafail thatto ishandle readythis for committingnotification.
	 */
	void sendToCommit(CommT committableendOfInput() throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

...

One of the special requirements of Hive is that the data partitioning key of the first two steps might be different.  For example the first step needs partition by the order.id and the second step needs to partition by the order.created_at. It is because it would introduce data skew if we use the same key to partition. The unified sink API uses the `Writer` to produce the committable data. It implies that there would be only one partition key. So this api does not meet the above scenario directly. 

From the discussion we will not support the HiveSink in the first version.

Is the sink an operator or a topology?

The scenario could be more complicated. For example, some users want to merge the files in one bucket before committing to the HMS. Where to place this logic? Do we want to put all these logic into one operator or a sink topology? 

From the discussion in the long run we should give the sink developer the ability of building “arbitrary” topologies. But for Flink-1.12 we should be more focused on only satisfying the S3/HDFS/Iceberg sink.

Compatibility, Deprecation, and Migration Plan

  • We does not change the current streaming and batch style sink API. So there is no compatibility issue to the existing sink implementation.
  • In the long run we might need to deprecate the old streaming and batch style sink API. 
  • At first we plan to migrate the StreamingFileSink to this new api.

Rejected Alternatives

The only difference between the rejected version and accepted version is how to expose the state to the user. The accepted version could give the framework a greater opportunity to optimize the state handling.

Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
* The sink interface acts like a factory to create the {@link Writer} and {@link Committer}.
*
* @param <T>       The type of the sink's input.
* @param <CommT>   The type of the committable data.
*/
public interface Sink<T, CommT> extends Serializable {

    /**
	* Create a {@link Writer}.
	* @param context the runtime context
	* @return A new sink writer.
	*/
	Writer<T, CommT> createWriter(InitialContext context) throws Exception;

	/**
	* Create a {@link Committer}.
	* @return A new sink committer
	*/
	Committer<CommT> createCommitter();

	/**
	* @return a committable serializer
	*/
	SimpleVersionedSerializer<CommT> getCommittableSerializer();

    /**
	* Providing the runtime information.
	*/
	interface InitialContext {

		boolean isRestored();

		int getSubtaskIndex();

		OperatorStateStore getOperatorStateStore();

		MetricGroup metricGroup();

	}
}

...