Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current state:  Under Discussion

...

Table of Contents
excludeStatus

Motivation

What does the unified sink API mean? The short answer is that write a sink once and use it everywhere. The long answer is that: Flink allows the user to 

...

The document includes three parts: The first part describes the semantics the unified sink API should support. According to the first part the second part proposes two versions of unified sink API. In the last part we introduce two open questions related to the API.

Semantics

This section would talk about which semantics the unified sink API would support. This section includes two parts. The first part describes how the sink API supports the exactly once semantics. The second part mainly talks about that drain and snapshot are two general semantics, which are valid for both bounded and unbounded scenarios. 

Consistency

This section tries to figure out the sink developer would be responsible for which part and Flink would be responsible for which part in terms of ensuring consistency. After this we could know what semantics the sink API should have to support the exact once semantics.

...

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

Drain and Snapshot

It seems that

  1. The sink has to deal with Snapshot/Drain and not to deal with the Termination in the unbounded scenario.
  2. The sink should not care about the Snapshot/Drain and only care about the Termination in the bounded scenario

...

  1. Both the bounded and unbounded scenario have to deal with the Drain. The Termination is not a unique concept to the bounded scenario. It's just a special case of Drain in the bounded scenario. So we should provide the Drain semantics to the sink API. 
  2. Snapshot is also available to both the bounded and unbounded scenario. But users should not assume how many snapshots would happen at runtime.

Sink API

From above analysis we could know that there are four semantics the sink API should support.

...

The only difference between the two versions is how to expose the state to the user. The cons & pros are following. 

First Alternative

  1. PROS
    1. This alternative has the advantage of not exposing anything about internal state handling to the user. We require the sink to provide us with the state to be persisted and it is up to the underlying runtime to handle them. For example, instead of using union state for the shared state, we could leverage a potential operator coordinator.
    2. We do not expose complex classes such as StateDescriptors, TypeSerializer... (only a simplified version: SimpleVersionedSerializer)
    3. The intent of the #snapshotState is cleaner. The signature tells the user already that it expects/allows some state out of this method.
  2. CONS
    1. It exposes more methods
    2. We get the state to snapshot via a return value from a method, the state must fit into memory. Moreover we must put the whole state into the state backend on each checkpoint instead of putting only the updated values during elements processing

...

Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* The {@link Writer} uses this interface to send the committable data to the operator who knows when to commit to the external system.
*
* @param <CommT> The type of the committable data.
*/
public interface WriterOutput<CommT> {

	/**
	* Send the committable data to the operator who knows when to commit.
	*
	* @param committable The data that is ready for committing.
	*/
	void sendToCommit(CommT committable) throws IOException;
}


Second Alternative

  1. PROS
    1. It is more consistent with development of other udf. Whether using the state or how many states it depends on the sink developer himself.
    2. We can e.g. append a single value to the state.
  2. CONS
    1. All the sinks depend on the operator state backend. All the improvements depend on the state backend. 
    2. Exposes more complex classes (see approach 1)
    3. Less clean contracts on the restore operations, number of supported states, potential for dangling states when updating the writers implementation

...

Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
* The Writer is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <T> 		The type of writer's input
* @param <CommT> 	The type of the committable data.
*/
public interface Writer<T, CommT> extends AutoCloseable {

	/**
	* Add an element to the writer.
	* @param t          The input record
	* @param context    The additional information about the input record
	* @param output     The committable data could be shipped to the committer by this
	* @throws Exception
	*/
	void write(T t, Context context, WriterOutput<CommT> output) throws Exception;

	/**
	* Snapshot the state of the writer.
	* @param output The committable data could be shipped to the committer by this.
	*/
	void snapshotState(WriterOutput<CommT> output) throws Exception;

	/**
	* Flush all the data which is un-staged to the committer.
	*/
	void flush(WriterOutput<CommT> output) throws IOException;

    interface Context {

		long currentProcessingTime();

		long currentWatermark();
		
		Long timestamp();
	}
}

Open Questions

There are still two open questions related to the unified sink API:

  1. How does the sink API support to write to the Hive?
  2. Is the sink an operator or a topology?

Compatibility, Deprecation, and Migration Plan

  • At first we will migrate the StreamingFileSink to this new api first.

Test Plan


Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...