Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Pass the blacklist information to cluster ResourceManager

只有jm这边的slot pool才会存一个blacklist tracker,然后

Yarn

First, nodes' attributes should include machine ip attribute, then we can control containers do not on some mechines by yarn PlacementConstraints.

...

Manage middle ResultPartition 

I think that speculative execution means that multiple executions in a ExecutionVertex running at the same time, and failover means that two executions running at two different time. Down-stream tasks will consume resultPartition 

...

As shown below for batch job with blocking shuffle, because of introduce speculative execution the down-stream tasks will consume the resultPartition of upstream ExecutionVertex's fastest finished execution. 

Image Added

  • Multiple executions of a upstream ExecutionVertex will produce Multiple ResultPartitions. When all upstream ExecutionVertexs run finished, the inputChannel of down stream executions will be updated to consume the fastest finished execution of upstream. So add a member-variable named fastestFinishedExecution in ExecutionVertex used for create PartitionInfo which used for update down stream executions' inputChannels

...

  • from unknow to local/remote. 
  • In order to avoid cause problems
Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private Execution fastestFinishedExecution = null;
	private volatile boolean vertexFinished = false;
}

...

  • when multiple executions in one ExecutionVertex finished at the same time.

...

  •  A member-variable

...

  • named

...

  • vertexFinished

...

  • was

...

  • added

...

  • in ExecutionVertex

...

  • used

...

  • for

...

  • indicates

...

  • whether

...

  • this

...

  • ExecutionVertex

...

  • run

...

  • finished

...

  • and

...

  • double

...

  • checked

...

  • locking

...

  • pattern

...

  • in

...

  • ExecutionVertex.executionFinished().

...

  • In ExecutionVertex.executionFinished()

...

  • , after double checked locking pattern

...

  • finishPartitionsAndUpdateConsumers() should be called instead of Execution.markFinished(),then non-fastest executions in this ExecutionVertex should be canceled.
Code Block
languagejava
titleDefaultExecutionGraph
public class ExecutionVertex
        implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
	private Execution fastestFinishedExecution = null;
	private volatile boolean vertexFinished = false;
}
  • When all upstream ExecutionVertexs run finished, all executions in down stream ExecutionVertex should be notifyed to update  inputChannels from unknow to local/remote so there are some modifys in Execution.updatePartitionConsumers().

  • When a task read from a blocking result partition, if its input is not available, the produce ExecutionVertex and all consumer ExecutionVertexs will be revoked. For upstream  produce ExecutionVertex, I think that SpeculativeScheduler thread should still work. For all consumer ExecutionVertexs I think we should kill all speculative executions and only restart original execution.

第三个要改的地方:When all upstream ExecutionVertexs run finished, Execution.updatePartitionConsumers() will be called.这里要更新下游所有execution.

consumerVertex.cachePartitionInfo(partitionInfo);将被修改为consumer.cachePartitionInfo(partitionInfo);并且ExecutionVertex里面的cachePartitionInfo()函数将被remove.以前可以这样做是因为默认一ExecutionVertex只会有一个execution,但是现在不同了,所以 可以这样改。

这里要重点想想怎么说了:

when the down stream execution meet DataConsumptionException. It will restart with the upstream execution that has been finished.

If a task read from a blocking result partition, when its input is not available, we can ‘revoke’ the produce task, set the task fail and rerun the upstream task to regenerate data.

In certain scenarios producer data was transferred in blocking mode or data was saved in persistent store. If the partition was missing, we need to revoke/rerun the produce task to regenerate the data.

这里要画一张图。把failover的情况加进去,特别是下游带着上游重启的情况。

...



Manage sink files

这个就是对于batch的sink file,到每个文件后面加全局唯一的后缀,然后最后在作业全部结束的时候,在主节点对于各种情况,去rename或者delete.

...