Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Released: <Flink Version>

This FLIP target to improve Flink’s ID system, including ID refactoring and LOG readability enhancement.

Background

Motivation

Currently, there are a lot of IDs in Flink. Most of the IDs show up all over the log, targeting to help user to understand what Flink is doing, especially in case of failures. However, we found that the readability of IDs in the current log is poor:

  • The string literals of most Flink’s IDs are hashcodes, e.g. ExecutionAttemptID, which do not provide much meaningful information and are hard to recognize and compare for users.
  • Currently, the log fails to always show the lineage information between IDs. Finding out relationships between entities identified by given IDs is a common demand, e.g., slot of which AllocationID is assigned to satisfy slot request of with SlotRequestID. Absence of such lineage information, it’s impossible to track the end to end lifecycle of an Execution or a Task now, which makes debugging difficult.

In this FLIP, we target to improve the log readability by adding more information to Flink’s IDs and logging out the lineage information among them.

Background

Before proposing changesIn this section, we first sort out all the IDs that covered by related to this proposalFLIP.

IDs to identify a Graph component


Graph Level

Identifier

Description/Notes

JobGraph

JobVertexID

Identify a job vertex.


IntermediateDataSetID

Identify an intermediate dataset.


ExecutionGraph

ExecutionVertexID

Identify an execution vertex. Composed of JobVertexID and subtaskIndex.

IntermediateResultPartitionID

Identify an intermediate result partition.


ExecutionAttemptID

Identify an Execution instance. Also used in TaskExecutor.

...

IDs to identify a distributed component


Entity

Identifier

Note

ResourceManager

ResourceID

Used to communicate with TMs and JM.

ResourceManagerID

Used as a fencing token in RPCIdentified by both ResourceID and ResourceManagerID.

TaskExecutor

Identified by both ResourceID and InstanceID. The string literal of ResourceID would be container id in Yarn mode and pod name in Kubernetes mode.

JobMaster/JobGraph

Identified by both ResourceID and JobID.

Slot

Identified by SlotID, which is composed of the ResourceID of TaskExecutor and slot index. It also could be identified by AllocationID in JobMaster side.

ResourceID

Used in communication between RM and TM.

Used by RM to identify a TM.

InstanceID

Used in communication between RM and TM.

Used by SlotManager to identify a TM.

JobMaster

ResourceID

Used to communicate with TMs and RM.

JobMasterID

Used as a fencing token in RPC.

Slot

SlotID

Used in TM, RM and SlotManager.

AllocationID

Used in JM

PendingTaskManagerSlot

Identified by TaskManagerSlotId.

PendingRequet in SlotPool

Identified by SlotRequestID.


SlotRequest in SlotPool, ResourceManager, and TaskExecutor

Identified by AllocationID.

Motivation

Flink uses a lot of IDs which are hard to decipher for the user. To help the user to understand what Flink is doing, we target to add more meaning to Flink’s IDs and LOGs while cleaning up the redundancy and refactoring the ID system:

...

...

AllocationID

...



Public Interfaces

This proposal does not touch the public interface. No capability issue should be introduced.

...

Add more meaning to Flink’s IDs

...

Add location information to distributed components

For the IDs of distributed components, it better to contain the location-relevant information to help the user to locate a specific one. For TaskExecutor, we need to add more The SlotID is an example whose string literal well describes itself(ResourceID of TaskExecutor plus the slot number). In this FLIP, we proposed to add location information to the ResourceID of TaskExecutor:

  • Regarding the YarnWorkerNode, we propose to add host and port info to it.
  • Regarding the KubernetesWorkerNode, the string literal of its ResourceID is now set to the PodName. It could give us enough information to locate a TaskExecutor. There is no need to change at the moment.
  • Regarding the Standalone mode, the ResourceID is produced randomly in TaskManagerRunner#main. We propose to add host and port info and/or the name of TaskExecutor to it.TBD...


If any other distributed component you think needs to add location information, it’s welcomed to comment.

Add topology information to graph components

For the IDs of the components in the Graph(JobGraph, ExecutionGraph, etc.), It’s better to contain the “parent” of the components and the attempt number or parallel index. These changes may not compatible with the current AbstractID definition. We should either refactor the AbstractID or let them not extend AbstractID.A positive example is ExecutionVertexID, which is composed of JobVertexID and subtaskIndex. In this FLIP, we proposed to add the topology information to the IDs of blow graph components:

  • Make the IntermediateResultPartitionID being composed of (IntermediateDataSetID, partitionIndex)
  • Make the ExecutionAttemptID being composed of (ExecutionVertexID, attemptNumber)
  • Add the producer info to the string literal of IntermediateDataSetID, IntermediateResultPartitionID. Just like ResultPartitianIDResultPartitionID.
  • Make the InstanceID in TaskExecutorConnection being composed of the ResourceID plus a monotonically increasing value.

These changes may not compatible with the current AbstractID definition.

...

We should either refactor the AbstractID or let them not extend AbstractID.

If any other graph component you think needs to add topology information, it’s welcomed to comment.

...

Log the ID’s lineage information

The lack of lineage information in Log makes debugging difficult for the slot relevant issues.

To illustrate why we need lineage information, we’d like to first sort out IDs in an end to end process of a simple WordCount job. Currently, we have three different IDs used to track the progress of Flink Runtime:

Currently, there are a lot of IDs in Flink’s log, which are used to identify different entities. There are lineages between some of these entities and IDs, e.g. the ExectionAttemptID in Execution and the SlotRequestID which constructed during the resource allocation for that Execution. There is not enough log to point out those lineages in Flink. When the job becomes large, e.g. with hundreds or thousands of tasks, it is hard to track the relationship of those distributed entities, making it hard to debug in case of failures.

The relationship between those entities is built dynamically and not as constant as the aforementioned relationship between IntermediateResultPartition and ResultPartition. We could not imply the lineage between them by adding more meaning or field to those IDs. Thus, we need to print a log to directly point out the lineage. we proposed to log the lineage of:

  • ExecutionAttemptID and SlotRequestID. The Execution instance is constructed with ExecutionAttemptID. The LOG log uses this identifier at the beginning. A Then, a SlotRequestID will be produced during `Execution#allocateAndAssignSlotForExecution` method. Since then, the identifier in LOG log change to the SlotRequestID
  • Running into the Scheduler component, if there is no available slot now, we will
    • Construct a PendingRequest, which is only identified by SlotRequestID, instance in SlotPoolImpl#requestNewAllocatedSlot
    • Produce an AllocationID add construct a SlotRequst, which is only identified by AllocationID, send it to the ResourceManager
  • There is no LOG that emphasizes the relationship of PendingRequest in the Scheduler and its SlotRequest. Since then, all the distributed components, JobMaster, ResourceManager, and TaskExecutor, identify the SlotRequest by AllocationID.
  • Finally, the TaskExecutor offers slots to the SlotPool, and a Task instance with both ExecutionAttemptID and AllocationID in it will be deployed to the TaskExecutor. The LOG in the process of Task execution is mixed with ExecutionAttemptID and AllocationID but never prints the two IDs in the same LOG line.

When the job becomes large, e.g. with hundreds or thousands of tasks, it is hard to track those distributed components from the log. No lineage information of those identifiers is given in each stage. Result in it is hard to debug the slot relevant issues.

We proposed to add some logs to directly point out the relationship/lineage when the identifier changed.

Refactor Flink’s ID system

Other topics under this FLIP, target to reduce redundancy, harden the ID system, and fix other identification issues.

Using ResourceID instead of InstanceID to identify a TaskExecutor at ResourceManager side

  • . Note that although there is a one-to-one correspondence between the two IDs in the current implementation, they are not one-to-one in design. In a future implementation, this relationship could be dynamic.
  • SlotRequestID and AllocationID. In the Scheduler module, the PendingRequest is identified by SlotRequestID, while the SlotRequest sent to the RM/TM is identified by AllocationID.
  • ExecutionAttemptID and AllocationID. In TM side, they all represent a specific Task, however, they never occur in the same log line.


So far, we sort out the above lineages which need to be added to the log. If you have other ideas, it’s welcome to comment.IIRC, a TaskExecutor instance should be verified by ResourceID; While the InstanceID should only identify a connection between ResourceManager and TaskManager. The InstanceID seems functionally more like a credit for the communication, just like ResourceManagerID. It should be only used during RPC. However, we use InstanceID instead to identify a TaskExecutor resource in ResourceManager and SlotManager. Also, the identifier usage of the relevant code path is confused, e.g. we could not get the ResourceID of this TaskExecutor from the TaskManagerSlotInformation. 

Expose the identifier of distributing component to user

...

Recently, Flink has bumped the log4j to log4j2. Now some users could use log4j2 build-in appenders or custom appenders to collect logs to external storage(e.g. HDFS, S3, etc.). However, user needs to construct the log name with some identifier. For each distributing component, e.g. JobMaster, the JobManager and each TaskExecutor, we could propose to expose the component name and ResourceID as an environment variable to user.

Other

...

.