Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-15679

...

Release1.12


Motivation

Currently, there are a lot of IDs in Flink. Most of the IDs show up all over the log, targeting to help user to understand what Flink is doing, especially in case of failures. However, we found that the readability of IDs in the current log is poor:

...

In this FLIP, we target to improve the log readability by adding more information to Flink’s IDs and logging out the lineage information among them.

Background

Before proposing changes, we first sort out all the IDs that related to this FLIP.

IDs to identify a Graph component


Graph Level

Identifier

Description/Notes

JobGraph

JobVertexID


IntermediateDataSetID


ExecutionGraph

ExecutionVertexID

Composed of JobVertexID and subtaskIndex.

IntermediateResultPartitionID


ExecutionAttemptID

Also used in TaskExecutor.


IDs to identify a distributed component


Entity

Identifier

Note

ResourceManager

ResourceID

Used to communicate with TMs and JM.

ResourceManagerID

Used as a fencing token in RPC.

TaskExecutor

ResourceID

Used in communication between RM and TM.

Used by RM to identify a TM.

InstanceID

Used in communication between RM and TM.

Used by SlotManager to identify a TM.

JobMaster

ResourceID

Used to communicate with TMs and RM.

JobMasterID

Used as a fencing token in RPC.

Slot

SlotID

Used in TM, RM and SlotManager.

AllocationID

Used in JM.

PendingRequet in SlotPool

SlotRequestID


SlotRequest in SlotPool, ResourceManager, and TaskExecutor

AllocationID



Public Interfaces

This proposal does not touch the public interface. No capability issue should be introduced.

Proposed Changes

Add more meaning to Flink’s IDs

Add location information to distributed components

For the IDs of distributed components, it better to contain the location-relevant information to help the user to locate a specific one. The SlotID is an example whose string literal well describes itself(ResourceID of TaskExecutor plus the slot number). In this FLIP, we proposed to add location information to the ResourceID of TaskExecutor:

...

If any other distributed component you think needs to add location information, it’s welcomed to comment.

Add topology information to graph components

For the IDs of the components in the Graph(JobGraph, ExecutionGraph, etc.), It’s better to contain the “parent” of the components and the attempt number or parallel index. A positive example is ExecutionVertexID, which is composed of JobVertexID and subtaskIndex. In this FLIP, we proposed to add the topology information to the IDs of blow graph below graph components:

  • Make the IntermediateResultPartitionID being composed of (IntermediateDataSetID, partitionIndex)
  • Make the ExecutionAttemptID being composed of (ExecutionVertexID, attemptNumber)
  • Add the producer info to the string literal of IntermediateDataSetID, IntermediateResultPartitionID. Just like ResultPartitionID.
  • Make the InstanceID in TaskExecutorConnection being composed of the ResourceID plus a monotonically increasing value.

These changes may not compatible with the current AbstractID definition. We should either refactor the AbstractID or let them not extend AbstractID.

If any other graph component you think needs to add topology information, it’s welcomed to comment.

Log the ID’s lineage information

Currently, there are a lot of IDs in Flink’s log, which are used to identify different entities. There are lineages between some of these entities and IDs, e.g. the ExectionAttemptID in Execution and the SlotRequestID which constructed during the resource allocation for that Execution. There is not enough log to point out those lineages in Flink. When the job becomes large, e.g. with hundreds or thousands of tasks, it is hard to track the relationship of those distributed entities, making it hard to debug in case of failures.

...

So far, we sort out the above lineages which need to be added to the log. If you have other ideas, it’s welcome to comment.

Expose the identifier of distributing component to user

Recently, Flink has bumped the log4j to log4j2. Now some users could use log4j2 build-in appenders or custom appenders to collect logs to external storage(e.g. HDFS, S3, etc.). However, user needs to construct the log name with some identifier. For the JobManager and each TaskExecutor, we propose to expose the component name and ResourceID as an environment variable to user.