Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
themeConfluence
firstline0
titleSink
linenumberstrue
/**
 * This interface lets the sink developer build a simple sink topology, which could guarantee the exactly once
 * semantics in both batch and stream execution mode if there is a {@link Committer} or {@link GlobalCommitter}.
 * 1. The {@link SinkWriter} is responsible for producing the committable.
 * 2. The {@link Committer} is responsible for committing a single committable.
 * 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we call the global
 *    committable. The {@link GlobalCommitter} is always executed with a parallelism of 1.
 * Note: Developers need to ensure the idempotence of {@link Committer} and {@link GlobalCommitter}.
 *
 * @param <InputT>        The type of the sink's input
 * @param <CommT>         The type of information needed to commit data staged by the sink
 * @param <WriterStateT>  The type of the sink writer's state
 * @param <GlobalCommT>   The type of the aggregated committable
 */
@Experimental
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable {

	/**
	 * Create a {@link SinkWriter}.
	 *
	 * @param context the runtime context.
	 * @param states the writer's state.
	 *
	 * @return A sink writer.
	 *
	 * @throws IOException if fail to create a writer.
	 */
	SinkWriter<InputT, CommT, WriterStateT> createWriter(
			InitContext context,
			List<WriterStateT> states) throws IOException;

	/**
	 * Creates a {@link Committer}.
	 *
	 * @return A committer.
	 *
	 * @throws IOException if fail to create a committer.
	 */
	Optional<Committer<CommT>> createCommitter() throws IOException;

	/**
	 * Creates a {@link GlobalCommitter}.
	 *
	 * @return A global committer.
	 *
	 * @throws IOException if fail to create a global committer.
	 */
	Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;

	/**
	 * Returns the serializer of the committable type.
	 */
	Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();

	/**
	 * Returns the serializer of the aggregated committable type.
	 */
	Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();

	/**
	 * Return the serializer of the writer's state type.
	 */
	Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

	/**
	 * The interface exposes some runtime info for creating a {@link SinkWriter}.
	 */
	interface InitContext {

		/**
		 * Returns a {@link ProcessingTimeService} that can be used to
		 * get the current time and register timers.
		 */
		ProcessingTimeService getProcessingTimeService();

		/**
		 * @return The id of task where the writer is.
		 */
		int getSubtaskId();

		/**
		 * @return The metric group this writer belongs to.
		 */
		MetricGroup metricGroup();
	}

	/**
	 * A service that allows to get the current processing time and register timers that
	 * will execute the given {@link ProcessingTimeCallback} when firing.
	 */
	interface ProcessingTimeService {

		/** Returns the current processing time. */
		long getCurrentProcessingTime();

		/**
		 * Invokes the given callback at the given timestamp.
		 *
		 * @param time Time when the callback is invoked at
		 * @param processingTimerCallback The callback to be invoked.
		 */
		void registerProcessingTimer(long time, ProcessingTimeCallback processingTimerCallback);

		/**
		 * A callback that can be registered via {@link #registerProcessingTimer(long,
		 * ProcessingTimeCallback)}.
		 */
		interface ProcessingTimeCallback {

			/**
			 * This method is invoked with the time which the callback register for.
			 *
			 * @param time The time this callback was registered for.
			 */
			void onProcessingTime(long time) throws IOException;
		}
	}
}

...

Code Block
languagejava
themeConfluence
firstline0
titleWriter
linenumberstrue
/**
 * The interface{@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
 * data, e.g. in-progress files.
 * As soon as some data is ready to commit, they The data (or metadata pointing to where the actual data is staged) ready are shipped to an operator who knows when to commit themto commit is
 * returned to the system by the {@link #prepareCommit(boolean)}.
 *
 * @param <InputT>         The type of the sink writer's input
 * @param <CommT>          The type of the committable datainformation needed to commit data staged by the sink
 * @param <WriterStateT>   The type of the writer's state
 */
public interface Writer<InputTSinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

	/**
	 * Add an element to the writer.
	 *
	 * @param element The input record
	 * @param ctxcontext The additional information about the input record
	 *
	 * @throws IOException if fail to add an element.
	 */
	void write(InputT element, Context ctxcontext) throws IOException;

	/**
	 * Prepare for a commit.
	 *
	 @param flush  whether flushing the un-staged committable or not* <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
	 */
	List<CommT> prepareCommit(boolean flush);

	/**
	 * @return the writer's state. * @param flush Whether flushing the un-staged data or not
	 */
	List<WriterStateT> snapshotState();

	interface Context {

		long currentProcessingTime();

		long currentWatermark();
 @return The data is ready to commit.
	 *
	 * @throws IOException if fail to prepare for a commit.
	 */
	List<CommT> prepareCommit(boolean flush) throws IOException;

	/**
	 * @return The writer's state.
	 *
	 * @throws IOException if fail to snapshot writer's state.
	 */
	List<WriterStateT> snapshotState() throws IOException;


	/**
	 * Context that {@link #write} can use for getting additional data about an input record.
	 */
	interface Context {

		/**
		 * Returns the current event-time watermark.
		 */
		long currentWatermark();

		/**
		 * Returns the timestamp of the current input record or {@code null} if the element does not
		 * have an assigned timestamp.
		 */
		Long timestamp();
	}
}


Code Block
languagejava
themeConfluence
firstline0
titleCommitter
linenumberstrue
/**
* This interface knows how to commit the data to the external system.
* 
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
	void commit(CommT committable) throws Exception;
}

...