Status

Motivation

Currently, there are a lot of IDs in Flink. Most of the IDs show up all over the log, targeting to help user to understand what Flink is doing, especially in case of failures. However, we found that the readability of IDs in the current log is poor:

  • The string literals of most Flink’s IDs are hashcodes, e.g. ExecutionAttemptID, which do not provide much meaningful information and are hard to recognize and compare for users.
  • Currently, the log fails to always show the lineage information between IDs. Finding out relationships between entities identified by given IDs is a common demand, e.g., slot of which AllocationID is assigned to satisfy slot request of with SlotRequestID. Absence of such lineage information, it’s impossible to track the end to end lifecycle of an Execution or a Task now, which makes debugging difficult.

In this FLIP, we target to improve the log readability by adding more information to Flink’s IDs and logging out the lineage information among them.

Background

Before proposing changes, we first sort out all the IDs that related to this FLIP.

IDs to identify a Graph component


Graph Level

Identifier

Description/Notes

JobGraph

JobVertexID


IntermediateDataSetID


ExecutionGraph

ExecutionVertexID

Composed of JobVertexID and subtaskIndex.

IntermediateResultPartitionID


ExecutionAttemptID

Also used in TaskExecutor.


IDs to identify a distributed component


Entity

Identifier

Note

ResourceManager

ResourceID

Used to communicate with TMs and JM.

ResourceManagerID

Used as a fencing token in RPC.

TaskExecutor

ResourceID

Used in communication between RM and TM.

Used by RM to identify a TM.

InstanceID

Used in communication between RM and TM.

Used by SlotManager to identify a TM.

JobMaster

ResourceID

Used to communicate with TMs and RM.

JobMasterID

Used as a fencing token in RPC.

Slot

SlotID

Used in TM, RM and SlotManager.

AllocationID

Used in JM.

PendingRequet in SlotPool

SlotRequestID


SlotRequest in SlotPool, ResourceManager, and TaskExecutor

AllocationID



Public Interfaces

This proposal does not touch the public interface. No capability issue should be introduced.

Proposed Changes

Add more meaning to Flink’s IDs

Add location information to distributed components

For the IDs of distributed components, it better to contain the location-relevant information to help the user to locate a specific one. The SlotID is an example whose string literal well describes itself(ResourceID of TaskExecutor plus the slot number). In this FLIP, we proposed to add location information to the ResourceID of TaskExecutor:

  • Regarding the YarnWorkerNode, we propose to add host and port info to it.
  • Regarding the KubernetesWorkerNode, the string literal of its ResourceID is now set to the PodName. It could give us enough information to locate a TaskExecutor. There is no need to change at the moment.
  • Regarding the Standalone mode, the ResourceID is produced randomly in TaskManagerRunner#main. We propose to add host and port info and/or the name of TaskExecutor to it.


If any other distributed component you think needs to add location information, it’s welcomed to comment.

Add topology information to graph components

For the IDs of the components in the Graph(JobGraph, ExecutionGraph, etc.), It’s better to contain the “parent” of the components and the attempt number or parallel index. A positive example is ExecutionVertexID, which is composed of JobVertexID and subtaskIndex. In this FLIP, we proposed to add the topology information to the IDs of below graph components:

  • Make the IntermediateResultPartitionID being composed of (IntermediateDataSetID, partitionIndex)
  • Make the ExecutionAttemptID being composed of (ExecutionVertexID, attemptNumber)

These changes may not compatible with the current AbstractID definition. We should either refactor the AbstractID or let them not extend AbstractID.

If any other graph component you think needs to add topology information, it’s welcomed to comment.

Log the ID’s lineage information

Currently, there are a lot of IDs in Flink’s log, which are used to identify different entities. There are lineages between some of these entities and IDs, e.g. the ExectionAttemptID in Execution and the SlotRequestID which constructed during the resource allocation for that Execution. There is not enough log to point out those lineages in Flink. When the job becomes large, e.g. with hundreds or thousands of tasks, it is hard to track the relationship of those distributed entities, making it hard to debug in case of failures.

The relationship between those entities is built dynamically and not as constant as the aforementioned relationship between IntermediateResultPartition and ResultPartition. We could not imply the lineage between them by adding more meaning or field to those IDs. Thus, we need to print a log to directly point out the lineage. we proposed to log the lineage of:

  • ExecutionAttemptID and SlotRequestID. The Execution instance is constructed with ExecutionAttemptID. The log uses this identifier at the beginning. Then, a SlotRequestID will be produced during `Execution#allocateAndAssignSlotForExecution` method. Since then, the identifier in log change to the SlotRequestID. Note that although there is a one-to-one correspondence between the two IDs in the current implementation, they are not one-to-one in design. In a future implementation, this relationship could be dynamic.
  • SlotRequestID and AllocationID. In the Scheduler module, the PendingRequest is identified by SlotRequestID, while the SlotRequest sent to the RM/TM is identified by AllocationID.
  • ExecutionAttemptID and AllocationID. In TM side, they all represent a specific Task, however, they never occur in the same log line.


So far, we sort out the above lineages which need to be added to the log. If you have other ideas, it’s welcome to comment.

Expose the identifier of distributing component to user

Recently, Flink has bumped the log4j to log4j2. Now some users could use log4j2 build-in appenders or custom appenders to collect logs to external storage(e.g. HDFS, S3, etc.). However, user needs to construct the log name with some identifier. For the JobManager and each TaskExecutor, we propose to expose the component name and ResourceID as an environment variable to user.