Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The document includes three parts: The first part describes the semantics the unified sink API should support. According to the first part the second part proposes two versions of unified sink API. In the last part we introduce two open questions related to the API.

Semantics

This section would talk about which semantics the unified sink API would support. This section includes two parts. The first part describes how the sink API supports the exactly once semantics. The second part mainly talks about that drain and snapshot are two general semantics, which are valid for both bounded and unbounded scenarios. 

Consistency

This section tries to figure out the sink developer would be responsible for which part and Flink would be responsible for which part in terms of ensuring consistency. After this we could know what semantics the sink API should have to support the exact once semantics.

...

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

Transactional Sink API


Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer to build a simple transactional sink topology pattern, which satisfies the HDFS/S3/Iceberg sink.
 * This sink topology includes one {@link Writer} + one {@link Committer} + one {@link GlobalCommitter}.
 * The {@link Writer} is responsible for producing the committable.
 * The {@link Committer} is responsible for committing a single committable.
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called the global committable.
 * And the parallelism of the {@link GlobalCommitter} is always 1.
 * Both the {@link Committer} and the {@link GlobalCommitter} are optional.
 * @param <IN>           The type of the sink's input
 * @param <CommT>        The type of the committable data
 * @param <GCommT>       The type of the aggregated committable
 * @param <WriterStateT> The type of the writer's state
 */
interface TransactionalSink<IN, CommT, GCommT, WriterStateT> {

	/**
	 * Create a {@link Writer}.
	 * @param context the runtime context
	 * @return A new sink writer
	 */
	Writer<IN, CommT, WriterStateT> createWriter(InitContext context);


	/**
	 * Restore a {@link Writer} from the state.
	 * @param context the runtime context
	 * @param states the previous writers' state
	 * @return A sink writer
	 */
	Writer<IN, CommT, WriterStateT> restoreWriter(InitContext context, List<WriterStateT> states);

	/**
	 * @return a {@link Committer}
	 */
	Optional<Committer<CommT>> createCommitter();

	/**
	 * @return a {@link GlobalCommitter}
	 */
	Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter();

	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	Optional<SimpleVersionedSerializer<GCommT>> getGlobalCommittableSerializer();

	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	interface InitContext {

		int getSubtaskId();

		int getAttemptID();

		MetricGroup metricGroup();
	}
}

...