You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 32 Next »

Status

Current state:  Under Discussion

Discussion thread:  https://lists.apache.org/thread.html/rf09dfeeaf35da5ee98afe559b5a6e955c9f03ade0262727f6b5c4c1e%40%3Cdev.flink.apache.org%3E

JIRA:

Released: 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

As discussed in FLIP-131, Flink will deprecate the DataSet API in favor of DataStream API and Table API. Users should be able to use DataStream API to write jobs that support both bounded and unbounded execution modes. However Flink does not provide a sink API to guarantee the exactly once semantics in both bounded and unbounded scenarios, which blocks the unification. 

So we want to introduce a new unified sink API which could let the user develop sink once and run it everywhere. Specifically Flink allows the user to 

  1. Choose the different SDK(SQL/Table/DataStream)
  2. Choose the different execution mode(Batch/Stream) according to the scenarios(bounded/unbounded) 

We hope these things(SDK/Execution mode) are transparent to the sink API.

The document includes three parts: The first part describes the semantics the unified sink API should support. According to the first part the second part proposes a transactional sink API. In the last part we introduce two open questions related to the API.

Semantics

This section tries to figure out the sink developer would be responsible for which part and Flink would be responsible for which part in terms of ensuring consistency. After this we could know what semantics the sink API should have to support the exact once semantics.

From a high-level perspective, we can always abstract a job by reading data from an external system and then writing the processed data to another external system. Sink's responsibility is to write data to external systems. In general, writing data to external systems mainly answers four questions What & How & When & Where to commit the data. Following we would give some explanation about these four questions. After that, we could answer the question asked at the beginning of this part. 

What to Commit. The data generated by the job goes through two stages before it could be committed to the external system. The first stage is the preparation. Normally the data produced by the job could not be written/committed to the external system immediately thanks to some conditions that have not been met. For example, in the `StreamingFileSink` the data is first written to an in-progress-file before it could be committed to the filesystem. The second stage is a commitment. In this stage, the data could be committed to an external system. The data in the second stage is called What to commit.

How to Commit. When data is ready to be committed we need a system-specific way to commit the data to the external. Sometimes the sink needs to commit the data by modifying the file’s name. Sometimes the sink needs to submit some meta infos to let data be visible to the end users.

When to Commit. From the users’ perspective, one of the most important requirements is correctness. Normally the user wants the data generated by the job to be committed to the external system once and only once. If we commit the data at the wrong time there might be duplicated data. For example current Flink through restarting the job when there is failover. This always replays some elements.

Where to Commit. This is about how many resources that we need to guarantee exactly once when time is ready. For example If we assume that commit operation will happen at where the committable data is produced the lifecycle of the component that is responsible for producing the committable data would last until the end of the commit operation.

Actually When & Where are all about how to guarantee the exactly once semantics. Now Flink exposes the internal implementation to the sink developer through two interfaces `CheckpointedFunction` & `CheckpointListener`. All the exactly once sinks are coupled with the internal exactly once implementation through implementing the two interfaces to guarantee the exactly once semantics. 

For supporting the bounded scenario we find that the mechanism of `CheckpointedFunction` & `CheckpointListener` is not inline with the bounded scenario. In a bounded scenario Flink should guarantee the exact once semantics:

  1. Even if there is only one resource.(Where)
  2. Even if there is no normal checkpoint at all.(When)

It means current streaming style sink implementation is not suitable for the bounded scenario. Flink must introduce some new mechanism for the bounded scenario. However, we might not couple the sink API to another new internal mechanism which guarantees consistency in the bounded scenario. These internal mechanisms should be decoupled with the sink’s API.

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

Transactional Sink API


Sink
/**
 * This interface lets the sink developer to build a simple transactional sink topology pattern, which satisfies the HDFS/S3/Iceberg sink.
 * This sink topology includes one {@link Writer} + one {@link Committer} + one {@link GlobalCommitter}.
 * The {@link Writer} is responsible for producing the committable.
 * The {@link Committer} is responsible for committing a single committable.
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called the global committable.
 * And the parallelism of the {@link GlobalCommitter} is always 1.
 * Both the {@link Committer} and the {@link GlobalCommitter} are optional.
 * @param <InputT>       The type of the sink's input
 * @param <CommT>        The type of the committable data
 * @param <GlobalCommT>  The type of the aggregated committable
 * @param <WriterStateT> The type of the writer's state
 */
interface TransactionalSink<InputT, CommT, GlobalCommT, WriterStateT> {

	/**
	 * Create a {@link Writer}.
	 * @param context the runtime context
	 * @param states the previous writers' state
	 * @return A sink writer
	 */
	Writer<InputT, CommT, WriterStateT> createWriter(InitContext context, List<WriterStateT> states);

	/**
	 * @return a {@link Committer}
	 */
	Optional<Committer<CommT>> createCommitter();

	/**
	 * @return a {@link GlobalCommitter}
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter();

	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	interface InitContext {

		int getSubtaskId();

		MetricGroup metricGroup();
	}
}


Writer
/**
 * The interface is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
 * As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
 *
 * @param <InputT>       The type of the writer's input
 * @param <CommT>        The type of the committable data
 * @param <WriterStateT> The type of the writer's state
 */
interface Writer<InputT, CommT, WriterStateT> {

	/**
	 * Add an element to the writer.
	 * @param element The input record
	 * @param ctx The additional information about the input record
	 * @param output The committable data could be shipped to the committer by this
	 */
	void write(InputT element, Context ctx, WriterOutput<CommT> output);

	/**
	 * Prepare for a commit.
	 * @param flush  whether flushing the un-staged committable or not
	 * @param output The committable data could be shipped to the committer by this
	 */
	void prepareCommit(boolean flush, WriterOutput<CommT> output);

	/**
	 * @return the writer's state.
	 */
	List<WriterStateT> snapshotState();

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}


Committer
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}
GlobalCommitter
/**
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables.
 *
 * @param <CommT>   The type of the committable data
 * @param <GlobalCommT>  The type of the aggregated committable
 */
interface GlobalCommitter<CommT, GlobalCommT> {

	/**
	 * This method is called when restoring from a failover.
	 * @param globalCommittables the global committable that are not committed in the previous session.
	 * @return the global committables that should be committed again in the current session.
	 */
	List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables);

	/**
	 * Compute an aggregated committable from a collection of committables.
	 * @param committables a collection of committables that are needed to combine
	 * @return an aggregated committable
	 */
	GlobalCommT combine(List<CommT> committables);

	CommitResult commit(GlobalCommT globalCommittable);

	/**
	 * There is no committable any more.
	 */
	void endOfInput();

	enum CommitResult {
		SUCCESS, FAILURE, RETRY
	}
}


WriterOutput
/**
* The {@link Writer} uses this interface to send the committable data to the operator who knows when to commit to the external system.
*
* @param <CommT> The type of the committable data.
*/
public interface WriterOutput<CommT> {

	/**
	* Send the committable data 
to the operator who knows when to commit.
	*
	* @param committable The data that is ready for committing.
	*/
	void sendToCommit(CommT committable) throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

How does the sink API support to write to the Hive?  

In general HiveSink needs three steps before committing the data to the Hive:

  1. The first step is writing the data to the FileSystem. 
  2. The second step is computing which directories could be committed 
  3. The third step is commit the directories the HMS

One of the special requirements of Hive is that the data partitioning key of the first two steps might be different.  For example the first step needs partition by the order.id and the second step needs to partition by the order.created_at. It is because it would introduce data skew if we use the same key to partition. The unified sink API uses the `Writer` to produce the committable data. It implies that there would be only one partition key. So this api does not meet the above scenario directly. 

From the discussion we will not support the HiveSink in the first version.

Is the sink an operator or a topology?

The scenario could be more complicated. For example, some users want to merge the files in one bucket before committing to the HMS. Where to place this logic? Do we want to put all these logic into one operator or a sink topology? 

From the discussion in the long run we should give the sink developer the ability of building “arbitrary” topologies. But for Flink-1.12 we should be more focused on only satisfying the S3/HDFS/Iceberg sink.

Compatibility, Deprecation, and Migration Plan

  • We does not change the current streaming and batch style sink API. So there is no compatibility issue to the existing sink implementation.
  • In the long run we might need to deprecate the old streaming and batch style sink API. 
  • At first we plan to migrate the StreamingFileSink to this new api.

Rejected Alternatives

The difference between the rejected version and accepted version is how to expose the state to the user. The accepted version could give the framework a greater opportunity to optimize state handling.

Sink
/**
* The sink interface acts like a factory to create the {@link Writer} and {@link Committer}.
*
* @param <T>       The type of the sink's input.
* @param <CommT>   The type of the committable data.
*/
public interface Sink<T, CommT> extends Serializable {

    /**
	* Create a {@link Writer}.
	* @param context the runtime context
	* @return A new sink writer.
	*/
	Writer<T, CommT> createWriter(InitialContext context) throws Exception;

	/**
	* Create a {@link Committer}.
	* @return A new sink committer
	*/
	Committer<CommT> createCommitter();

	/**
	* @return a committable serializer
	*/
	SimpleVersionedSerializer<CommT> getCommittableSerializer();

    /**
	* Providing the runtime information.
	*/
	interface InitialContext {

		boolean isRestored();

		int getSubtaskIndex();

		OperatorStateStore getOperatorStateStore();

		MetricGroup metricGroup();

	}
}
Writer
/**
* The Writer is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <T> 		The type of writer's input
* @param <CommT> 	The type of the committable data.
*/
public interface Writer<T, CommT> extends AutoCloseable {

	/**
	* Add an element to the writer.
	* @param t          The input record
	* @param context    The additional information about the input record
	* @param output     The committable data could be shipped to the committer by this
	* @throws Exception
	*/
	void write(T t, Context context, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the state of the writer.
	* @param output The committable data could be shipped to the committer by this.
	*/
	void snapshotState(WriterOutput<CommT> output) throws Exception;

	/**
	* Flush all the data which is un-staged to the committer.
	*/
	void flush(WriterOutput<CommT> output) throws IOException;

    interface Context {

		long currentProcessingTime();

		long currentWatermark();
		
		Long timestamp();
	}
}
  • No labels