Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

When batch job writes record into file or Key-value databases, this feature could be enabled.

...

(1)Sink to key-value databases, no more steps are needed to do with speculative execution feature.

...


(2)Sink to file by FileOutputFormat and OutputFormatSinkFunction, a global unique ExecutionAttemptID suffix will be added after the file name. Then, some files will be deleted or renamed when finalizeOnMaster() is called.


As shown below, four hashSets will be created and some global unique ExecutionAttemptIDs will be added to them.

...

Once all tasks finish, different processing methods will be applied to different files according to which HashSet its suffix is in. Moreover, FileOutputFormat need implements FinalizeOnMaster interface and add some code in open() and configure().

Code Block
languagejava
titleDefaultExecutionGraph
public class JobVertex implements java.io.Serializable {
	// ExecutionAttemptIDs of the fastest finished executions in all speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> fastAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the non-fastest finished executions in all speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> slowAttemptIdsWithSpeculative = new HashSet<>();

	// ExecutionAttemptIDs of the success finished executions in all non-speculated ExecutionVertex in this jobVertex.
	private Set<ExecutionAttemptID> finishedAttemptIdsWithoutSpeculative = new HashSet<>();

	// All ExecutionAttemptIDs in this jobVertex.
	private Set<ExecutionAttemptID> allAttemptIds = new HashSet<>();
}


(3)Sink to file by FileSink in flink-connector-files.

The same logical, a global unique ExecutionAttemptID suffix will be added after the file name. Then, some files will be deleted or renamed when finalizeOnMaster() is called.


Image AddedMoreover, FileOutputFormat need implements FinalizeOnMaster interface and add some code in open() and configure().

Metrics

For each ExecutionJobVertex, I use six metrics to measure and evaluate the efficiency of speculative execution that can be summarized in the job status and web page.

...