Status
...
...
...
JIRA:
...
ASF JIRA | serverId | 5aa69414-a9e9-3523-82ec-879b028fb15b |
---|
key | FLINK-19510 |
---|
|
| Release | 1.12 |
---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
firstline | 0 |
---|
title | Sink |
---|
linenumbers | true |
---|
|
/**
* This interface lets the sink developer build a simple sink topology pattern, which could satisfiesguarantee the HDFS/S3/Iceberg sink.exactly once
* Thissemantics sinkin topology includes one {@link Writer} + oneboth batch and stream execution mode if there is a {@link Committer} +or one {@link GlobalCommitter}.
* 1. The {@link WriterSinkWriter} is responsible for producing the committable.
* 2. The {@link Committer} is responsible for committing a single committable.
* 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we calledcall the global committable.
* And the parallelism ofcommittable. theThe {@link GlobalCommitter} is always executed with a parallelism of 1.
* BothNote: theDevelopers {@linkneed Committer}to andensure the idempotence of {@link GlobalCommitterCommitter} are optional.and {@link GlobalCommitter}.
*
* @param <InputT> The type of the sink's input
* @param <CommT> The type of the committable datainformation needed to commit data staged by the sink
* @param <GlobalCommT><WriterStateT> The type of the aggregated committablesink writer's state
* @param <GlobalCommT> <WriterStateT> The type of the writer'saggregated statecommittable
*/
public interface Sink<InputT, CommT, GlobalCommTWriterStateT, GlobalCommT> extends WriterStateT>Serializable {
/**
* Create a {@link WriterSinkWriter}.
*
* @param context the runtime context.
* @param states the previous writerswriter's state.
*
* @return A sink writer.
*
* @throws IOException if fail to create a writer.
*/
Writer<InputTSinkWriter<InputT, CommT, WriterStateT> createWriter(
InitContext context,
List<WriterStateT> states) throws IOException;
/**
* @returnCreates a {@link Committer}.
*/
Optional<Committer<CommT>> createCommitter();
/** * @return A committer.
*
* @return @throws IOException if fail to create a {@link GlobalCommitter}committer.
*/
Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitterOptional<Committer<CommT>> createCommitter();
Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();
Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer() throws IOException;
Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();
/**
interface InitContext* Creates a {@link GlobalCommitter}.
*
int getSubtaskId();
MetricGroup metricGroup();
}
} |
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
firstline | 0 |
---|
title | Writer |
---|
linenumbers | true |
---|
|
/**
* The interface is responsible for writing data and handling any potential tmp area used to write yet un-staged data, e.g. in-progress files.
* As soon as some data is ready to commit, they (or metadata pointing to where the actual data is staged) are shipped to an operator who knows when to commit them.
*
* @param <InputT> The type of the writer's input
* @param <CommT> The type of the committable data
* @param <WriterStateT> The type of the writer's state
*/
interface Writer<InputT, CommT, WriterStateT> {
/**
* Add an element to the writer.
* @param element The input record
* @param ctx The additional information about the input record
*/
void write(InputT element, Context ctx); * @return A global committer.
*
* @throws IOException if fail to create a global committer.
*/
Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter() throws IOException;
/**
* Returns the serializer of the committable type.
*/
Optional<SimpleVersionedSerializer<CommT>> getCommittableSerializer();
/**
* Returns the serializer of the aggregated committable type.
*/
Optional<SimpleVersionedSerializer<GlobalCommT>> getGlobalCommittableSerializer();
/**
* Return the serializer of the writer's state type.
*/
Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();
/**
* The interface exposes some runtime info for creating a {@link SinkWriter}.
*/
interface InitContext {
/**
* Returns a {@link ProcessingTimeService} that can be used to
* get the current time and register timers.
*/
ProcessingTimeService getProcessingTimeService();
/**
* @return The id of task where the writer is.
*/
int getSubtaskId();
/**
* @return The metric group this writer belongs to.
*/
MetricGroup metricGroup();
}
/**
* PrepareA forservice a commit.
* @param flush whether flushing the un-staged committable or notthat allows to get the current processing time and register timers that
* will execute the given {@link ProcessingTimeCallback} when firing.
*/
List<CommT>interface prepareCommit(boolean flush);ProcessingTimeService {
/**
*Returns @returnthe thecurrent writer'sprocessing statetime.
*/
List<WriterStateT>long snapshotStategetCurrentProcessingTime();
interface Context {
long currentProcessingTime();
long currentWatermark();
Long timestamp();
}
} |
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
firstline | 0 |
---|
title | Committer |
---|
linenumbers | true |
---|
|
/**
* This interface knows how to commit the data to the external system.
*
* @param <CommT> The type of the committable data.
*/
public interface Committer<CommT> {
void commit(CommT committable) throws Exception;
} |
/**
* Invokes the given callback at the given timestamp.
*
* @param time Time when the callback is invoked at
* @param processingTimerCallback The callback to be invoked.
*/
void registerProcessingTimer(long time, ProcessingTimeCallback processingTimerCallback);
/**
* A callback that can be registered via {@link #registerProcessingTimer(long,
* ProcessingTimeCallback)}.
*/
interface ProcessingTimeCallback {
/**
* This method is invoked with the time which the callback register for.
*
* @param time The time this callback was registered for.
*/
void onProcessingTime(long time) throws IOException;
}
}
} |
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
firstline | 0 |
---|
title | Writer |
---|
linenumbers | true |
---|
|
/**
* The {@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
* data, e.g. in-progress files. The data (or metadata pointing to where the actual data is staged) ready to commit is
* returned to the system by the {@link #prepareCommit(boolean)}.
*
* @param <InputT> The type of the sink writer's input
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <WriterStateT> The type of the writer's state
*/
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {
/**
* Add an element to the writer.
*
* @param element The input record
* @param context The additional information about the input record
*
* @throws IOException if fail to add an element.
*/
void write(InputT element, Context context) throws IOException;
/**
* Prepare for a commit.
*
* <p>This will be called before we checkpoint the Writer's state in Streaming execution mode.
*
* @param flush Whether flushing the un-staged data or not
* @return The data is ready to commit.
*
* @throws IOException if fail to prepare for a commit.
*/
List<CommT> prepareCommit(boolean flush) throws IOException;
/**
* @return The writer's state.
*
* @throws IOException if fail to snapshot writer's state.
*/
List<WriterStateT> snapshotState() throws IOException;
/**
* Context that {@link #write} can use for getting additional data about an input record.
*/
interface Context {
/**
* Returns the current event-time watermark.
*/
long currentWatermark();
/**
* Returns the timestamp of the current input record or {@code null} if the element does not
* have an assigned timestamp.
*/
Long timestamp();
}
} |
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
firstline | 0 |
---|
title | Committer |
---|
linenumbers | true |
---|
|
/**
* The {@code Committer} is responsible for committing the data staged by the sink.
*
* @param <CommT> The type of information needed to commit the staged data
*/
public interface Committer<CommT> extends AutoCloseable {
/**
* Commit the given list of {@link CommT}.
* @param committables A list of information needed to commit data staged by the sink.
* @return A list of {@link CommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
* @throws IOException if the commit operation fail and do not want to retry any more.
*/
List<CommT> commit(List<CommT> committables) throws IOException;
} |
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
title | GlobalCommitter |
---|
linenumbers | true |
---|
|
/**
* The {@code GlobalCommitter} is responsible for creating and committing an aggregated committable,
* which we call global committable (see {@link #combine}).
*
* <p>The {@code GlobalCommitter} runs with parallelism equal to 1.
*
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <GlobalCommT> The type of the aggregated committable
*/
public interface GlobalCommitter<CommT, GlobalCommT> extends AutoCloseable {
/**
* Find out which global committables need to be retried when recovering from the failure.
* @param globalCommittables A list of {@link GlobalCommT} for which we want to verify
* which ones were successfully committed and which ones did not.
*
* @return A list of {@link GlobalCommT} that should be committed again.
*
* @throws IOException if fail to filter the recovered committables.
*/
List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables) throws IOException;
/**
* Compute an aggregated committable from a list of committables.
* @param committables A list of {@link CommT} to be combined into a {@link GlobalCommT}.
*
* @return an aggregated committable
*
* @throws IOException if fail to combine the given committables.
*/
GlobalCommT combine(List<CommT> committables) throws IOException;
/**
* Commit the given list of {@link GlobalCommT}.
*
* @param globalCommittables a list of {@link GlobalCommT}.
*
* @return A list of {@link GlobalCommT} needed to re-commit, which is needed in case we implement a "commit-with-retry" pattern.
*
* @throws IOException if the commit operation fail and do not want to retry any more.
*/
List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException |
Code Block |
---|
language | java |
---|
theme | Confluence |
---|
title | GlobalCommitter |
---|
|
/**
* The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we called global committables.
*
* @param <CommT> The type of the committable data
* @param <GlobalCommT> The type of the aggregated committable
*/
interface GlobalCommitter<CommT, GlobalCommT> {
/**
* This method is called when restoring from a failover.
* @param globalCommittables the global committable that are not committed in the previous session.
* @return the global committables that should be committed again in the current session.
*/
List<GlobalCommT> filterRecoveredCommittables(List<GlobalCommT> globalCommittables);
/**
* Compute an aggregated committable from a collection of committables.
* @param committables a collection of committables that are needed to combine
* @return an aggregated committable
*/
GlobalCommT combine(List<CommT> committables);
CommitResult commit(GlobalCommT globalCommittable);
/**
* Signals that Therethere is no committable any more.
*
* @throws IOException if fail to handle this notification.
*/
void endOfInput();
enum CommitResult {
SUCCESS, FAILURE, RETRY
}throws IOException;
}
|
Open Questions
There are still two open questions related to the unified sink API
...