Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In summary the sink API should be responsible for producing What to commit & providing How to commit. Flink should be responsible for guaranteeing the exact semantics. Flink could “optimize” When & Where to commit according to the execution mode and these optimizations should be transparent to the sink developer.

Sink API

From above analysis we could know that there are four semantics the sink API should support.

  1. What to commit
  2. How to commit
  3. SnapshotState
  4. Drain(Flush)

Following we propose two versions of the sink APIs. There is no difference on how to express the four semantics between the two versions: 

  1. `Sink` is responsible for creating the `Writer` and `Committer`
  2. The `Writer` is responsible for writing data and handling any potential tmp area used to write yet unstaged data, e.g. in-progress files. As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
  3. The `Writer` also responds to the progress notification: Snapshot and Flush.
  4. The `Committer` knows how to commit the data to the destination storage system.

The only difference between the two versions is how to expose the state to the user. The cons & pros are following. 

First Alternative

  1. PROS
    1. This alternative has the advantage of not exposing anything about internal state handling to the user. We require the sink to provide us with the state to be persisted and it is up to the underlying runtime to handle them. For example, instead of using union state for the shared state, we could leverage a potential operator coordinator.
    2. We do not expose complex classes such as StateDescriptors, TypeSerializer... (only a simplified version: SimpleVersionedSerializer)
    3. The intent of the #snapshotState is cleaner. The signature tells the user already that it expects/allows some state out of this method.
  2. CONS
    1. It exposes more methods
    2. We get the state to snapshot via a return value from a method, the state must fit into memory. Moreover we must put the whole state into the state backend on each checkpoint instead of putting only the updated values during elements processing


Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer to build a simple transactional sink topology pattern, which satisfies the HDFS/S3/Iceberg sink.
 * This sink topology includes one {@link Writer} + one {@link Committer} + one {@link GlobalCommitter}.
 * The {@link Writer} is responsible for producing the committable.
 * The {@link Committer} is responsible for committing a single committable.
 * The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called the global committable.
 * And the parallelism of the {@link GlobalCommitter} is always 1.
 * Both the {@link Committer} and the {@link GlobalCommitter} are optional.
 * @param <IN>           The type of the sink's input
 * @param <CommT>        The type of the committable data
 * @param <GCommT>       The type of the aggregated committable
 * @param <WriterStateT> The type of the writer's state
 */
interface TransactionalSink<IN, CommT, GCommT, WriterStateT> {

	/**
	 * Create a {@link Writer}.
	 * @param context the runtime context
	 * @return A new sink writer
	 */
	Writer<IN, CommT, WriterStateT> createWriter(InitContext context);


	/**
	 * Restore a {@link Writer} from the state.
	 * @param context the runtime context
	 * @param states the previous writers' state
	 * @return A sink writer
	 */
	Writer<IN, CommT, WriterStateT> restoreWriter(InitContext context, List<WriterStateT> states);

	/**
	 * @return a {@link Committer}
	 */
	Optional<Committer<CommT>> createCommitter();

	/**
	 * @return a {@link GlobalCommitter}
	 */
	Optional<GlobalCommitter<CommT, GCommT>> createGlobalCommitter();

	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	Optional<SimpleVersionedSerializer<GCommT>> getGlobalCommittableSerializer();

	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	interface InitContext {

		int getSubtaskId();

		int getAttemptID();

		MetricGroup metricGroup();
	}
}

...