Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
titleRequest Example
{
    [
        {
            "id" : "node1/container1",
            "action" : "MARK_BLOCKED",
            "endTimestamp" : "1652317200000",
            "cause" : "Hot machine",
			"mergeOnConflictallowMerge" : "true"
        },
        {
            "id" : "node2/container2",
            "action" : "MARK_BLOCKED_AND_EVACUATE_TASKS",
            "timeout" : "3600000",
            "cause" : "No space left on device"
        }, 
        ...
    ]
}

...

  1. id: A string value that specifies the identifier of the blocked task manager or node.
  2. action: An enum value(MARK_BLOCKED or MARK_BLOCKED_AND_EVACUATE_TASKS) that specifies the block action when a task manager/node is marked as blocked.
  3. timeout(optional): A long value that specifies the timeout (milliseconds).
  4. endTimestamp(optional): A long value that specifies the unix timestamp(milliseconds) at which the item should be removed. Note that only one of timeout and endTimestamp can be specified. If neither is specified, it means that the blocked item is permanent and will not be removed. If both are specified, will return error.
  5. cause: A string value that specifies the cause for blocking this task manager or node.
  6. mergeOnConflictallowMerge(optional): A boolean value that specifies whether to merge when a conflict occurs. The default value is false. 

When trying to add a taskmanager or node, if the corresponding taskmanager/node already exists in blocklist, we propose to introduce two processing behaviors:

  1. If field mergeOnConflict allowMerge is false, return error. 
  2. If field mergeOnConflict allowMerge is true. The newly added item and the existing item will be merged into one. Regarding the 3 fields, the merging algorithm:
    1. For action, merge(MARK_BLOCKED, MARK_BLOCKED_AND_EVACUATE_TASKS) = MARK_BLOCKED_AND_EVACUATE_TASKS
    2. For endTimestamp, merge(endTimestampA, endTimestampB) = max(endTimestampA, endTimestampB)
    3. For cause, we will combine all causes, merge("causeA", "causeB") = "causeA,causeB"

...

  1. If no conflict, the response code is 201(CREATED), the response body is empty.
  2. If conflict occurs:
    1. If mergeOnConflict allowMerge is false, the response code is 409(CONFLICT), and returns error.
    2. if mergeOnConflict allowMerge is true, the response code is 202(ACCEPTED), the response body is the merged result.

...

Request

Request body: {}

Response

Response code: if the item identified by id does not exist, the response code is 404(NOT FOUND), and returns error. Else, the response code is 200(OK)Response body: {}, and returns an empty response body.

Proposed Changes

In this design, two granularities of blocked resources are supported: task managers and nodes. A record of blocklist information is called a blocked item, which is generally generated by the scheduler according to the exception of the tasks. These blocked items will be recorded in a special component and affect the resource allocation of Flink clusters. However,the blocked items are not permanent, there will be a timeout for it. Once an item times out, it will be removed, and the resource will become available again. The overall structure of the blocklist mechanism is shown in the figure below. 

...