You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateDrafting

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA Unable to render Jira issues macro, execution error.

Released: <Flink Version>


This FLIP target to improve Flink’s ID system, including ID refactoring and LOG readability enhancement.

Background

In this section, we first sort out all the IDs that covered by this proposal.

IDs to identify a Graph component

JobGraph

JobVertexID

Identify a job vertex.

IntermediateDataSetID

Identify an intermediate dataset.

ExecutionGraph

ExecutionVertexID

Identify an execution vertex. Composed of JobVertexID and subtaskIndex.

IntermediateResultPartitionID

Identify an intermediate result partition.

ExecutionAttemptID

Identify an Execution instance. Also used in TaskExecutor.

IDs to identify a distributed component

ResourceManager

Identified by both ResourceID and ResourceManagerID.

TaskExecutor

Identified by both ResourceID and InstanceID. The string literal of ResourceID would be container id in Yarn mode and pod name in Kubernetes mode.

JobMaster/JobGraph

Identified by both ResourceID and JobID.

Slot

Identified by SlotID, which is composed of the ResourceID of TaskExecutor and slot index. It also could be identified by AllocationID in JobMaster side.

PendingTaskManagerSlot

Identified by TaskManagerSlotId.

PendingRequet in SlotPool

Identified by SlotRequestID.

SlotRequest in SlotPool, ResourceManager, and TaskExecutor

Identified by AllocationID.

Motivation

Flink uses a lot of IDs which are hard to decipher for the user. To help the user to understand what Flink is doing, we target to add more meaning to Flink’s IDs and LOGs while cleaning up the redundancy and refactoring the ID system:

  • Enhance the readability of IDs’ string literal. The string literal of most Flink’s IDs are meaningless hashcode. We need to add more meaning to it.
  • Make debugging more convenient by enhancing the lineage information of IDs. Currently, the LOG does not contain the lineage info of those IDs, e.g. we could not know the relationship between AllocationID and SlotRequestID from the log. As a result, it’s impossible to track the end to end lifecycle of an Execution or a Task now, which makes debugging difficult.
  • Remove the redundancy that exists in Flink’s ID system. We need to sort out the current ID system before introducing more complexity

Public Interfaces

This proposal does not touch the public interface. No capability issue should be introduced.

Proposed Changes

Add more meaning to Flink’s IDs

The SlotID is an example whose string literal well describes itself(ResourceID of TaskExecutor plus the slot number). Another positive example is ExecutionVertexID, which is composed of JobVertexID and subtaskIndex.

For the IDs of distributed components, it better to contain the location-relevant information to help the user to locate a specific one. For TaskExecutor, we need to add more information to the ResourceID:

  • Regarding the YarnWorkerNode, we propose to add host and port info to it.
  • Regarding the KubernetesWorkerNode, the string literal of its ResourceID is now set to the PodName. It could give us enough information to locate a TaskExecutor. There is no need to change at the moment.
  • Regarding the Standalone mode, the ResourceID is produced randomly in TaskManagerRunner#main. We propose to add host and port info and/or the name of TaskExecutor to it.
  • TBD...

For the IDs of the components in the Graph(JobGraph, ExecutionGraph, etc.), It’s better to contain the “parent” of the components and the attempt number or parallel index. These changes may not compatible with the current AbstractID definition. We should either refactor the AbstractID or let them not extend AbstractID.

  • Make the IntermediateResultPartitionID being composed of (IntermediateDataSetID, partitionIndex)
  • Make the ExecutionAttemptID being composed of (ExecutionVertexID, attemptNumber)
  • Add the producer info to the string literal of IntermediateDataSetID, IntermediateResultPartitionID. Just like ResultPartitianID.
  • Make the InstanceID in TaskExecutorConnection being composed of the ResourceID plus a monotonically increasing value.
  • TBD…

Log the ID’s lineage information

The lack of lineage information in Log makes debugging difficult for the slot relevant issues.

To illustrate why we need lineage information, we’d like to first sort out IDs in an end to end process of a simple WordCount job. Currently, we have three different IDs used to track the progress of Flink Runtime:

  • The Execution instance is constructed with ExecutionAttemptID. The LOG uses this identifier at the beginning.
  • A SlotRequestID will be produced during `Execution#allocateAndAssignSlotForExecution` method. Since then, the identifier in LOG change to the SlotRequestID
  • Running into the Scheduler component, if there is no available slot now, we will
    • Construct a PendingRequest, which is only identified by SlotRequestID, instance in SlotPoolImpl#requestNewAllocatedSlot
    • Produce an AllocationID add construct a SlotRequst, which is only identified by AllocationID, send it to the ResourceManager
  • There is no LOG that emphasizes the relationship of PendingRequest in the Scheduler and its SlotRequest. Since then, all the distributed components, JobMaster, ResourceManager, and TaskExecutor, identify the SlotRequest by AllocationID.
  • Finally, the TaskExecutor offers slots to the SlotPool, and a Task instance with both ExecutionAttemptID and AllocationID in it will be deployed to the TaskExecutor. The LOG in the process of Task execution is mixed with ExecutionAttemptID and AllocationID but never prints the two IDs in the same LOG line.

When the job becomes large, e.g. with hundreds or thousands of tasks, it is hard to track those distributed components from the log. No lineage information of those identifiers is given in each stage. Result in it is hard to debug the slot relevant issues.

We proposed to add some logs to directly point out the relationship/lineage when the identifier changed.

Refactor Flink’s ID system

Other topics under this FLIP, target to reduce redundancy, harden the ID system, and fix other identification issues.

Using ResourceID instead of InstanceID to identify a TaskExecutor at ResourceManager side

IIRC, a TaskExecutor instance should be verified by ResourceID; While the InstanceID should only identify a connection between ResourceManager and TaskManager. The InstanceID seems functionally more like a credit for the communication, just like ResourceManagerID. It should be only used during RPC. However, we use InstanceID instead to identify a TaskExecutor resource in ResourceManager and SlotManager. Also, the identifier usage of the relevant code path is confused, e.g. we could not get the ResourceID of this TaskExecutor from the TaskManagerSlotInformation. 

Expose the identifier of distributing component to user

Recently, Flink has bumped the log4j to log4j2. Now some users could use log4j2 build-in appenders or custom appenders to collect logs to external storage(e.g. HDFS, S3, etc.). However, user needs to construct the log name with some identifier. For each distributing component, e.g. JobMaster, TaskExecutor, we could expose the component name and ResourceID as an environment variable to user.

Other

  • Introduce a specific ColocationGroupID for ColocationGroup instead of directly using AbstractID.
  • Extend a RandomAbstractID class for the scenario when we need a random AbstractID.
  • Add a ResourceID field in RegisteredMesosWorkerNode.
  • Since Slot is also a distributed component, we propose to make the SlotID being derived from ResourceID.
  • TBD...
  • No labels