...
I will introduce blacklist module into Flink which used for filter node when the executions are scheduled.
Init black list
where register?
The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically.
Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
Add element to black list
We have implemented a machine-dimensional blacklist per job. The machine IP was added in the blacklist when an execution is recognized as a long-tail execution. The
Remove element in black list
The blacklist would remove the machine IP when it is out of date.
Pass the blacklist information to cluster ResourceManager
Yarn
反射,yarn-3.0
When the executions are scheduled, we will add information of the blacklist to yarn PlacementConstraint. In this way, I can ensure that the yarn container is not on the machines in the blacklist.The blacklist module is a thread that maintains the black machines of this job and removes expired elements periodically. Each element in blacklist contains IP and timestamp. The timestamp is used to decide whether the elements of the blacklist is expired or not.
k8s
My code only supports Yarn integration. But as far as I know, we could use nodeaffinity or podaffinity to achieve the same goal with Yarn PlacementConstraint in K8s integration. As the mesos integration will be deprecated in Flink-1.13, I didn’t consider it.
Yarn
k8s
...
Mesos
According to
Jira | ||||||
---|---|---|---|---|---|---|
|
So we don’t need to think about how to pass the blacklist information to mesos.
Manage input and output of each ExecutionVertex
...