Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We hope these things(SDK/Execution mode) are transparent to the sink API.

The document includes three parts: The first part describes the semantics the unified sink API should support. According to the first part the second part proposes two versions of unified sink API. In the last part we introduce two open questions related to the API.

Semantics

This section would talk about which semantics the unified sink API would support. This section includes two parts. The first part describes how the sink API supports the exactly once semantics. The second part mainly talks about that drain and snapshot are two general semantics, which are valid for both bounded and unbounded scenarios. 

Consistency

This section tries to figure out the sink developer would be responsible for which part and Flink would be responsible for which part in terms of ensuring consistency. After this we could know what semantics the sink API should have to support the exact once semantics.

...

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

Drain and Snapshot

It seems that

  1. The sink has to deal with Snapshot/Drain and not to deal with the Termination in the unbounded scenario.
  2. The sink should not care about the Snapshot/Drain and only care about the Termination in the bounded scenario

If it is true the sink API could not be unified because it implies that the sink developer has to be aware of the bounded and unbounded scenario. However, one target of the unified sink API is to make bounded/unbounded be transparent to the sink developer. 

In general these three concepts(Snapshot/Drain/Termination) are all related with the progress of a job. The progress is a “position” of the dataset that a job has received. It means that the job has already received all the data before that “position”. A job would be executed by the Flink. The progress for Flink means that all the components of the Flink have received the data before the “position”.  

Drain means that all the components need to flush all the data before the “position”. For the internal operator/udf this means flushing all the data before the “position” to the downstream operator. For the sink operator this means flushing all the data before the “position” to the external system. 

Termination is a special Drain. The requirement is the same. The difference between Drain and Termination is that the “position” is specified by the user or the “position” is specified at the end of the source.

Snapshot is the state of a component at a “position”. Checkpoint is a collection of the snapshot of all the components at a “position”. Compared to the Termination/Drain we could see the Snapshot as a fine grained progress of a job. In general the developer might not care about when the Snapshot happens because that is an implementation of the optimizing the failover. In other words we could choose not to expose the Snapshot to the sink API in theory. However from the perspective of compatibility and performance exposing this to the sink API may be more realatics. Even if we expose the Snapshot to the sink API users should not assume how fine the granularity is. For example in the bounded scenario + streaming mode there might be no regular checkpoint at all because that the job runs too fast. In general how many times a snapshot would happen is just a runtime concept which depends on the specific implementation at the time or scenario.

From the above analysis we could see that 

  1. Both the bounded and unbounded scenario have to deal with the Drain. The Termination is not a unique concept to the bounded scenario. It's just a special case of Drain in the bounded scenario. So we should provide the Drain semantics to the sink API. 
  2. Snapshot is also available to both the bounded and unbounded scenario. But users should not assume how many snapshots would happen at runtime.

Sink API

From above analysis we could know that there are four semantics the sink API should support.

  1. What to commit
  2. How to commit
  3. SnapshotState
  4. Drain(Flush)

Following we propose two versions of the sink APIs. There is no difference on how to express the four semantics between the two versions: 

  1. `Sink` is responsible for creating the `Writer` and `Committer`
  2. The `Writer` is responsible for writing data and handling any potential tmp area used to write yet unstaged data, e.g. in-progress files. As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
  3. The `Writer` also responds to the progress notification: Snapshot and Flush.
  4. The `Committer` knows how to commit the data to the destination storage system.

The only difference between the two versions is how to expose the state to the user. The cons & pros are following. 

First Alternative

  1. PROS
    1. This alternative has the advantage of not exposing anything about internal state handling to the user. We require the sink to provide us with the state to be persisted and it is up to the underlying runtime to handle them. For example, instead of using union state for the shared state, we could leverage a potential operator coordinator.
    2. We do not expose complex classes such as StateDescriptors, TypeSerializer... (only a simplified version: SimpleVersionedSerializer)
    3. The intent of the #snapshotState is cleaner. The signature tells the user already that it expects/allows some state out of this method.
  2. CONS
    1. It exposes more methods
    2. We get the state to snapshot via a return value from a method, the state must fit into memory. Moreover we must put the whole state into the state backend on each checkpoint instead of putting only the updated values during elements processing

Sink API

From above analysis we could know that there are four semantics the sink API should support.

  1. What to commit
  2. How to commit
  3. SnapshotState
  4. Drain(Flush)

Following we propose two versions of the sink APIs. There is no difference on how to express the four semantics between the two versions: 

  1. `Sink` is responsible for creating the `Writer` and `Committer`
  2. The `Writer` is responsible for writing data and handling any potential tmp area used to write yet unstaged data, e.g. in-progress files. As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
  3. The `Writer` also responds to the progress notification: Snapshot and Flush.
  4. The `Committer` knows how to commit the data to the destination storage system.

The only difference between the two versions is how to expose the state to the user. The cons & pros are following. 

First Alternative

  1. PROS
    1. This alternative has the advantage of not exposing anything about internal state handling to the user. We require the sink to provide us with the state to be persisted and it is up to the underlying runtime to handle them. For example, instead of using union state for the shared state, we could leverage a potential operator coordinator.
    2. We do not expose complex classes such as StateDescriptors, TypeSerializer... (only a simplified version: SimpleVersionedSerializer)
    3. The intent of the #snapshotState is cleaner. The signature tells the user already that it expects/allows some state out of this method.
  2. CONS
    1. It exposes more methods
    2. We get the state to snapshot via a return value from a method, the state must fit into memory. Moreover we must put the whole state into the state backend on each checkpoint instead of putting only the updated values during elements processing


Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
* The sink interface acts like a factory to create the {@link Writer} and {@link Committer}.
*
* @param <IN>      The type of the sink's input
* @param <CommT>   The type of the committable data
* @param <WriterS> The type of the writer's state
* @param <SharedS> The type of the writer's shared state
*/
public interface Sink<IN, CommT, WriterS, SharedS> extends Serializable {

	/**
	* Create a {@link Writer}.
	* @param context the runtime context
	* @return A new sink writer.
	*/
	Writer<IN, CommT, WriterS, SharedS> createWriter(InitContext context);

	/**
	* Restore a {@link Writer} from the state.
	* @param context the runtime context
	* @param state the previous writers' state
	* @param share the previous writers' shared state
	* @return A sink writer
	*/
	Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context, List<WriterS> state, List<SharedS> share);

	/**
	* Create a {@link Committer}.
	* @return A new sink committer
	*/
	Committer<CommT> createCommitter();

	/**
	* @return a committable serializer
	*/
	SimpleVersionedSerializer<CommT> getCommittableSerializer();

	/**
	* @return a writer's state serializer
	*/
	SimpleVersionedSerializer<WriterS> getStateSerializer();

	/**
	* @return a writer's shared state serializer
	*/
	SimpleVersionedSerializer<SharedS> getSharedStateSerializer();

	/**
	* Providing the runtime information.
	*/
	interface InitContext {

		int getSubtaskId();

		int getAttemptID();

		MetricGroup metricGroup();
	}
}



Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
* The Writer is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files. 
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <IN> 			The type of the writer's input
* @param <CommT> 		The type of the committable data
* @param <StateT> 		The type of the writer's state
* @param <SharedStateT> The type of the writer's shared state
*/
public interface Writer<IN, CommT, StateT, SharedStateT> {

	/**
	* Add an element to the writer.
	* @param element The input record.
	* @param ctx     The additional information about the input record
	* @param output  The committable data could be shipped to the committer by this.
	* @throws Exception
	*/
	void write(IN element, Context ctx, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the writer's state.
	* @param output  The committable data could be shipped to the committer by this.
	* @return The writer's state.
	*/
	List<StateT> snapshotState(WriterOutput<CommT> output) throws Exception;
Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
* The sink interface acts like a factory to create the {@link Writer} and {@link Committer}.
*
* @param <IN>      The type of the sink's input
* @param <CommT>   The type of the committable data
* @param <WriterS> The type of the writer's state
* @param <SharedS> The type of the writer's shared state
*/
public interface Sink<IN, CommT, WriterS, SharedS> extends Serializable {

	/**
	* Snapshot Createthe a {@link Writer}writer's shared state.
	* @param context the runtime context
	* @return AThe newwriter's sinkshared writerstate.
	*/
	Writer<IN, CommT, WriterS, SharedS> createWriter(InitContext context)SharedStateT snapshotSharedState() throws Exception;

	/**
	* Restore a {@link Writer} from Flush all the data which is un-staged to the statecommitter.
	* @param context the runtime contextoutput
	* @param state the previous writers' state/
	*void @param share the previous writers' shared state
	* @return A sink writer
	*/
	Writer<IN, CommT, WriterS, SharedS> restoreWriter(InitContext context, List<WriterS> state, List<SharedS> share);

	/**
	* Create a {@link Committer}.
	* @return A new sink committer
	*/
	Committer<CommT> createCommitter();

	/**
	* @return a committable serializer
	*/
	SimpleVersionedSerializer<CommT> getCommittableSerializer();

	/**
	* @return a writer's state serializer
	*/
	SimpleVersionedSerializer<WriterS> getStateSerializer();

	/**
	* @return a writer's shared state serializer
	*/
	SimpleVersionedSerializer<SharedS> getSharedStateSerializer();

	/**
	* Providing the runtime information.
	*/
	interface InitContext {

		int getSubtaskId();

		int getAttemptID();

		MetricGroup metricGroup();
	}
}flush(WriterOutput<CommT> output) throws Exception;

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}


Code Blockcode
languagejava
themeConfluence
firstline0
titleWriterWriterOutput
linenumberstrue
/**
* The Writer{@link isWriter} responsibleuses forthis writinginterface datato andsend handlingthe anycommittable potentialdata tmpto the areaoperator usedwho toknows writewhen yetto un-staged data, e.g. in-progress files. commit to the external system.
*
* As@param soon<CommT> asThe sometype dataof isthe ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to ancommittable data.
*/
public interface WriterOutput<CommT> {

	/**
	* Send the committable data to the operator who knows whenknows towhen commit themto commit.
	*
	* @param <IN>committable 			The typedata ofthat the writer's input
* @param <CommT> 		The type of the committable data
* @param <StateT> 		The type of the writer's state
* @param <SharedStateT> The type of the writer's shared state
*/
public interface Writer<IN, CommT, StateT, SharedStateT> {

	/**
	* Add an element to the writer.
	* @param element The input record.
	* @param ctx     The additional information about the input record
	* @param output  The committable data could be shipped to the committer by this.
	* @throws Exception
	*/
	void write(IN element, Context ctx, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the writer's state.
	* @param output  The committable data could be shipped to the committer by this.
	* @return The writer's state.
	*/
	List<StateT> snapshotState(WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the writer's shared state.
	* @return The writer's shared state.
	*/
	SharedStateT snapshotSharedState() throws Exception;

	/**
	* Flush all the data which is un-staged to the committer.
	* @param output
	*/
	void flush(WriterOutput<CommT> output) throws Exception;

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}
Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}
Code Block
languagejava
themeConfluence
firstline0
titleWriterOutput
linenumberstrue
/**
* The {@link Writer} uses this interface to send the committable data to the operator who knows when to commit to the external system.
*
* @param <CommT> The type of the committable data.
*/
public interface WriterOutput<CommT> {

	/**
	* Send the committable data to the operator who knows when to commit.
	*
	* @param committable The data that is ready for committing.
	*/
	void sendToCommit(CommT committable) throws IOException;
}

Second Alternative

  1. PROS
    1. It is more consistent with development of other udf. Whether using the state or how many states it depends on the sink developer himself.
    2. We can e.g. append a single value to the state.
  2. CONS
    1. All the sinks depend on the operator state backend. All the improvements depend on the state backend. 
    2. Exposes more complex classes (see approach 1)
    3. Less clean contracts on the restore operations, number of supported states, potential for dangling states when updating the writers implementation

...

languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
is ready for committing.
	*/
	void sendToCommit(CommT committable) throws IOException;
}


Open Questions

There are still two open questions related to the unified sink API

How does the sink API support to write to the Hive?  

In general HiveSink needs three steps before committing the data to the Hive:

  1. The first step is writing the data to the FileSystem. 
  2. The second step is computing which directories could be committed 
  3. The third step is commit the directories the HMS

One of the special requirements of Hive is that the data partitioning key of the first two steps might be different.  For example the first step needs partition by the order.id and the second step needs to partition by the order.created_at. It is because it would introduce data skew if we use the same key to partition. The unified sink API uses the `Writer` to produce the committable data. It implies that there would be only one partition key. So this api does not meet the above scenario directly.

Is the sink an operator or a topology?

The scenario could be more complicated. For example, some users want to merge the files in one bucket before committing to the HMS. Where to place this logic? Do we want to put all these logic into one operator or a sink topology?

Compatibility, Deprecation, and Migration Plan

  • We does not change the current streaming and batch style sink API. So there is no compatibility issue to the existing sink implementation.
  • In the long run we might need to deprecate the old streaming and batch style sink API. 
  • At first we plan to migrate the StreamingFileSink to this new api.

Rejected Alternatives

The only difference between the rejected version and accepted version is how to expose the state to the user. The accepted version could give the framework a greater opportunity to optimize the state.

...

Code Block
languagejava
themeConfluence
firstline0
titleWriterSink
linenumberstrue
/****
* The sink interface acts like a factory to create the {@link Writer} and {@link Committer}.
*
* The@param Writer<T>  is  responsible  for The writingtype dataof andthe handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <T> 		The type of writer's input
* @param <CommT> 	The type of the committable data.
*/
public interface Writer<T, CommT> extends AutoCloseable {

	/**
	* Add an element to the writer.
	* @param t          The input record
	* @param context    The additional information about the input record
	* @param output     The committable data could be shipped to the committer by this
	* @throws Exception
	*/
	void write(T t, Context context, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the state of the writer.
	* @param output The committable data could be shipped to the committer by this.
	*/
	void snapshotState(WriterOutput<CommT> output) throws Exception;

	/**
	* Flush all the data which is un-staged to the committer.
	*/
	void flush(WriterOutput<CommT> output) throws IOException;

    interface Context {

		long currentProcessingTime();

		long currentWatermark();
		
		Long timestamp();
	}
}

Open Questions

There are still two open questions related to the unified sink API

How does the sink API support to write to the Hive?  

In general HiveSink needs three steps before committing the data to the Hive:

  1. The first step is writing the data to the FileSystem. 
  2. The second step is computing which directories could be committed 
  3. The third step is commit the directories the HMS

One of the special requirements of Hive is that the data partitioning key of the first two steps might be different.  For example the first step needs partition by the order.id and the second step needs to partition by the order.created_at. It is because it would introduce data skew if we use the same key to partition. The unified sink API uses the `Writer` to produce the committable data. It implies that there would be only one partition key. So this api does not meet the above scenario directly.

Is the sink an operator or a topology?

The scenario could be more complicated. For example, some users want to merge the files in one bucket before committing to the HMS. Where to place this logic? Do we want to put all these logic into one operator or a sink topology?

Compatibility, Deprecation, and Migration Plan

  • We does not change the current streaming and batch style sink API. So there is no compatibility issue to the existing sink implementation.
  • In the long run we might need to deprecate the old streaming and batch style sink API. 
  • At first we plan to migrate the StreamingFileSink to this new api.

Rejected Alternatives

...

sink's input.
* @param <CommT>   The type of the committable data.
*/
public interface Sink<T, CommT> extends Serializable {

    /**
	* Create a {@link Writer}.
	* @param context the runtime context
	* @return A new sink writer.
	*/
	Writer<T, CommT> createWriter(InitialContext context) throws Exception;

	/**
	* Create a {@link Committer}.
	* @return A new sink committer
	*/
	Committer<CommT> createCommitter();

	/**
	* @return a committable serializer
	*/
	SimpleVersionedSerializer<CommT> getCommittableSerializer();

    /**
	* Providing the runtime information.
	*/
	interface InitialContext {

		boolean isRestored();

		int getSubtaskIndex();

		OperatorStateStore getOperatorStateStore();

		MetricGroup metricGroup();

	}
}


Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
* The Writer is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <T> 		The type of writer's input
* @param <CommT> 	The type of the committable data.
*/
public interface Writer<T, CommT> extends AutoCloseable {

	/**
	* Add an element to the writer.
	* @param t          The input record
	* @param context    The additional information about the input record
	* @param output     The committable data could be shipped to the committer by this
	* @throws Exception
	*/
	void write(T t, Context context, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the state of the writer.
	* @param output The committable data could be shipped to the committer by this.
	*/
	void snapshotState(WriterOutput<CommT> output) throws Exception;

	/**
	* Flush all the data which is un-staged to the committer.
	*/
	void flush(WriterOutput<CommT> output) throws IOException;

    interface Context {

		long currentProcessingTime();

		long currentWatermark();
		
		Long timestamp();
	}
}