Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In the Flink project, context-like APIs are used to provide diverse metadata and functionalities to different modules and components at runtime. Each context-like API is generally an interface annotated by @Public. Taking RuntimeContext as an example, it is an interface that consists of 27 methods. Some of these methods are used to get metadata, such as getJobId(), getTaskName(), getIndexOfThisSubtask(), and others are used to access functionalities provided by framework, such as getState(), getAccumulator(), getCounter(), and so on.

When new metadata needs to be provided by the RuntimeContext,  the corresponding getter methods should be added in the RuntimeContext interface. However, there are 12 implementation classes for the RuntimeContext interface. This means that every time a method is added to the RuntimeContext interface, the 12 implementation classes also need to add the corresponding overrided methods. This design results in extra code modification costs when providing more metadata inRuntimeContext. Similar issues exist in other context-like APIs as well.

To address this problem, we propose providing the metadata in a categorized manner within these context-like APIs, which means each category of metadata will only be offered through a single dedicated method. For example, we will encapsulate the metadata about the job information in a class called JobInfoImpl. For context-like APIs that provide job information, we will add the getJobInfo() method. When more job information needs to be provided by context-like APIs, we only need to update the JobInfoImpl class without modifying any methods of the context-like APIs and their implementations. For each category of metadata, we will follow the same approach and add the corresponding methods in the context-like APIs.

Public Interfaces

1. We introduce JobInfo interface  and TaskInfo interface. We also introduce their default implementation classes, including JobInfoImpl and TaskInfoImpl.

2. We introduce  getJobInfo() and getTaskInfo() method to both org.apache.flink.api.common.functions.RuntimeContext and org.apache.flink.api.connector.sink2.Sink.InitContext.

3. We introduce getJobInfo() method to org.apache.flink.core.failure.FailureEnricher.Context.

4. We introduce getTaskInfo() method to org.apache.flink.api.connector.source.SourceReaderContext.

More details can be found in Proposed Changes.

Proposed Changes

Introduce the categorization of metadata

We categorize the metadata in the context-like APIs into two categories. The first category is JobInfo, which includes metadata related to a single Flink job.

/** The {@link JobInfo} represents the meta information of the job. */
public interface JobInfo {

    /**
     * Get the ID of current job.
     *
     * @return the ID of current job
     */
    JobID getJobId();

    /**
     * Get the name of current job.
     *
     * @return the name of current job
     */
    String getJobName();
}

The second category is TaskInfo, which includes metadata related to a single task. Here is the definition:

/** The {@link TaskInfo} represents the meta information of the job. */
public interface TaskInfo {

    /**
     * Get the name of current task.
     *
     * @return the name of current task.
     */
    String getTaskName();

    /**
     * Get the index of current task.
     *
     * @return the index of current task.
     */
    int getTaskIndex();

    /**
     * Get the attempt number of current task. First attempt is numbered 0.
     *
     * @return the attempt number of current task.
     */
    int getAttemptNumber();

    /**
     * Get the name of the task, appended with the task index and attempt number, 
     * such as "MyTask (3/6)#1", where 3/6 would be the index of this task and 
     * 1 would be the attempt number.
     *
     * @return the task name with index and attempt number.
     */
    String getTaskNameWithIndexAndAttemptNumber();
  
    /**
     * Get the parallelism of current task.
     *
     * @return the parallelism.
     */
    int getParallelism();


    /**
     * Get the max parallelism of current task.
     *
     * @return the max parallelism.
     */
    int getMaxParallelism();

}

Unify the provision of metadata in context-like APIs

To provide required categories of metadata, we will add corresponding getter methods to provide JobInfo or TaskInfo in each context-like API. Methods that used to directly get metadata will be annotated by @Deprecated. The table shows these deprecated methods in each context-like API.

Context-like APIDeprecated Methods 

org.apache.flink.api.common.functions.RuntimeContext

getJobId()
getTaskName()
getNumberOfParallelSubtasks()
getMaxNumberOfParallelSubtasks()
getIndexOfThisSubtask()
getAttemptNumber()
getTaskNameWithSubtasks()

org.apache.flink.api.connector.source.SourceReaderContext

getIndexOfSubtask()
currentParallelism()

org.apache.flink.api.connector.sink2.Sink.InitContext

getSubtaskId()
getNumberOfParallelSubtasks()
getAttemptNumber()
getJobId()

org.apache.flink.core.failure.FailureEnricher.Context

getJobId()
getJobName()


In the following, we will introduce the methods added in each context-like API.

1. org.apache.flink.api.common.functions.RuntimeContext

The RuntimeContext provides metadata about the current job and task, such as job id, task name, task parallism, etc. We will add two new methods, including getJobInfo() and getTaskInfo()

public interface RuntimeContext {
  
    ...
  
    /**
     * Get the meta information about the current job.
     * 
     * @return the meta information.
     */
    JobInfo getJobInfo();

    /**
     * Get the meta information about the current task.
     * 
     * @return the meta information.
     */
    TaskInfo getTaskInfo();
}

2. org.apache.flink.api.connector.source.SourceReaderContext

The SourceReaderContext provides metadata about the current task, such as task index. We will add a new method called getTaskInfo().

public interface SourceReaderContext {
  
    ...
  
    /**
     * Get the meta information about the current task.
     * 
     * @return the meta information.
     */
    TaskInfo getTaskInfo();
}

3. org.apache.flink.api.connector.sink2.Sink.InitContext

The Sink.InitContext provides metadata about the current job and task, such as job id, task index, task attempt number, etc. We will add two new methods, including getJobInfo() and getTaskInfo().

interface InitContext {
  
    ...
  
    /**
     * Get the meta information about the current job.
     * 
     * @return the meta information.
     */
    JobInfo getJobInfo();

    /**
     * Get the meta information about the current task.
     * 
     * @return the meta information.
     */
    TaskInfo getTaskInfo();
}

4. org.apache.flink.core.failure.FailureEnricher.Context

The FailureEnricher.Context provides metadata about the current job, such as job id, job name, etc. We will add a new method getJobInfo().

interface Context {
  
    ...
  
    /**
     * Get the meta information about the current job.
     * 
     * @return the meta information.
     */
    JobInfo getJobInfo();
}


Rejected Alternatives

None.

Compatibility, Deprecation, and Migration Plan

1. The JobInfo class and TaskInfo class will be annotated by @PublicEnvolving as they doesn't ensure compatiblity between major and minor versions.

2. The newly proposed methods in context-like APIs are incompatible with the old APIs. However, we will provide a reasonable migration period. To be specific, we will deprecate the old methods at the same time of introducing the new methods (expected in 1.19), and remove the old methods after the migration period (expected in 2.0). Once the old APIs are removed, users may need to modify their codes in order to upgrade to the latest Flink version.

Test Plan

We will provide unit and integration tests to validate the proposed changes.