Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer build a simple sink topology pattern, which satisfiescould guarantee the HDFS/S3/Iceberg sink.exactly once
 * This sink topology includes one {@link Writer} + one semantics in both batch and stream execution mode if there is a {@link Committer} + oneor {@link GlobalCommitter}.
 * 1. The {@link WriterSinkWriter} is responsible for producing the committable.
 * 2. The {@link Committer} is responsible for committing a single committable.
 * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we calledcall the global committable.
 * And the parallelism ofcommittable. theThe {@link GlobalCommitter} is always executed with a parallelism of 1.
 * Both Note: Developers need to ensure the idempotence of {@link Committer} and the {@link GlobalCommitter} are optional..
 *
 * @param <InputT>        The type of the sink's input
 * @param <CommT>         The type of theinformation committableneeded data
to *commit @paramdata <GlobalCommT>  The type ofstaged by the aggregated committablesink
 * @param <WriterStateT>  The type of the sink writer's state
 * @param <GlobalCommT>   The type of the aggregated committable
 */
@Experimental
public interface Sink<InputT, CommT, GlobalCommTWriterStateT, GlobalCommT> extends WriterStateT>Serializable {

	/**
	 * Create a {@link WriterSinkWriter}.
	 *
	 * @param context the runtime context.
	 * @param states the previous writerswriter's state.
	 *
	 * @return A sink writer.
	 */
	Writer<InputT, CommT,* WriterStateT>@throws createWriter(InitContext context, List<WriterStateT> states);

	/**
	 * @return a {@link Committer}
	 */
	Optional<Committer<CommT>> createCommitter()IOException if fail to create a writer.
	 */
	SinkWriter<InputT, CommT, WriterStateT> createWriter(
			InitContext context,
			List<WriterStateT> states) throws IOException;

	/**
	 * @returnCreates a {@link GlobalCommitterCommitter}.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter();

	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	interface InitContext {

		int getSubtaskId();

		MetricGroup metricGroup();* @return A committer.
	 *
	 * @throws IOException if fail to create a committer.
	 */
	Optional<Committer<CommT>> createCommitter() throws IOException;

	/**
	 * Creates a {@link GlobalCommitter}.
	 *
	 * @return A global committer.
	 *
	 * @throws IOException if fail to create a global committer.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

	/**
	 * Returns the serializer of the committable type.
	 */
	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	/**
	 * Returns the serializer of the aggregated committable type.
	 */
	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	/**
	 * Return the serializer of the writer's state type.
	 */
	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	/**
	 * The interface exposes some runtime info for creating a {@link SinkWriter}.
	 */
	interface InitContext {

		/**
		 * Returns a {@link ProcessingTimeService} that can be used to
		 * get the current time and register timers.
		 */
		ProcessingTimeService getProcessingTimeService();

		/**
		 * @return The id of task where the writer is.
		 */
		int getSubtaskId();

		/**
		 * @return The metric group this writer belongs to.
		 */
		MetricGroup metricGroup();
	}

	/**
	 * A service that allows to get the current processing time and register timers that
	 * will execute the given {@link ProcessingTimeCallback} when firing.
	 */
	interface ProcessingTimeService {

		/** Returns the current processing time. */
		long getCurrentProcessingTime();

		/**
		 * Invokes the given callback at the given timestamp.
		 *
		 * @param time Time when the callback is invoked at
		 * @param processingTimerCallback The callback to be invoked.
		 */
		void registerProcessingTimer(long time, ProcessingTimeCallback processingTimerCallback);

		/**
		 * A callback that can be registered via {@link #registerProcessingTimer(long,
		 * ProcessingTimeCallback)}.
		 */
		interface ProcessingTimeCallback {

			/**
			 * This method is invoked with the time which the callback register for.
			 *
			 * @param time The time this callback was registered for.
			 */
			void onProcessingTime(long time) throws IOException;
		}
	}
}



Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The interface is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
 * As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
 *
 * @param <InputT>       The type of the writer's input
 * @param <CommT>        The type of the committable data
 * @param <WriterStateT> The type of the writer's state
 */
interface Writer<InputT, CommT, WriterStateT> {

	/**
	 * Add an element to the writer.
	 * @param element The input record
	 * @param ctx The additional information about the input record
	 */
	void write(InputT element, Context ctx);

	/**
	 * Prepare for a commit.
	 * @param flush  whether flushing the un-staged committable or not
	 */
	List<CommT> prepareCommit(boolean flush);

	/**
	 * @return the writer's state.
	 */
	List<WriterStateT> snapshotState();

	interface Context {

		long currentProcessingTime();

		long currentWatermark();

		Long timestamp();
	}
}

...