Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Enhance the Writing of flip wrt context

...

[This FLIP proposal is a joint work between  Rui Fan and Samrat Deb  ]

Table of Contents

Motivation

The proposal to introduce autoscaling for Flink (FLIP-271

...

) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated with Kubernetes and resides within the flink-kubernetes-operator

...

repository.

For some reasons, the flink jobs of flink on yarn also need to use flink-autoscaler:

...

There are compelling reasons to extend the usage of the flink-autoscaler to Flink jobs running on YARN as well:

  1. With the recent merge of the Externalized Declarative Resource Management (FLIP-291, FLINK-31316)

...

  1. , in-place scaling is now supported across all types of Flink jobs

...

This FLIP focus on decoupling autoscaler and kubernetes.

Core idea

...

  1. . This development has made scaling Flink on YARN a straightforward process.

  2. Several discussions within the Flink user community, as observed in the mail list , have emphasized the necessity of flink-autoscaler supporting Flink on YARN.

Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.

Core Idea


  1. JobAutoScaler and JobAutoScalerImpl: These components will define the general autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.

  2. Interfaces: The FLIP outlines the necessity of defining two interfaces - AutoScalerHandler  and AutoScalerStateStore .
    The AutoScalerHandler  interface handles event-based operations, while the AutoScalerStateStore  interface is responsible for accessing and persisting the autoscaler's state.

  3. Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as

...

  • JobAutoScaler and the general JobAutoScalerImpl

...

  • AutoScalerHandler: defining the event handler interface.
  • AutoScalerStateStore: defining the state store interface for accessing and persisting state.

...

  1. fabric8, flink-kubernetes-operator

...

  1. , or flink-kubernetes.

...

  1. Instead, it can rely on Apache Flink project dependencies to gather metrics and make scaling decisions based on JobVertexID , Configuration ,

...

  1. MetricGroup ,

...

  1. and other relevant classes.

...


  1. Optional Goal: As a nice-to-have

...

  • It's easy to implement flink-yarn-sutoscaler after decoupling. And adding flink-yarn-sutoscaler to flink-kubernetes-operator repo is wired.
  • I prefer keeping this module in flink-kubernetes-operator repo during this FLIP, and we can move it in the last step of this FLIP.

...

  • [Must-to-have] Implement the AutoScalerHandler and AutoScalerStateStore
    • AutoScalerHandler calls kubernetes do the real scaling or records some events.
    • AutoScalerStateStore calls kubernetes to query and update the ConfigMap
  1. feature, the FLIP proposes moving the flink-autoscaler module to the Apache Flink repository, thereby making it an integral part of the Flink project.
    Please note that, Initially autoscaler module will be part of flink-kubernetes-operator repository during this FLIP, and we can move the autoscaler module to apache flink in the last step of this FLIP.


Note:
Independent flink-kubernetes-operator-autoscaler  module is not necessary. Moving Note: the independent `flink-kubernetes-operator-autoscaler` module isn't necessary, moving these classes to flink-kubernetes-operator  will reduce complexity. We can discuss it in the mail list.

Why it isn't necessary?

  • In the POC version, `flinkflink-kubernetes-operator-autoscaler` just defines 2 class, they are: KubernetesAutoScalerHandler autoscaler  defines KubernetesAutoScalerHandler  and KubernetesAutoScalerStateStore .
  • If `flinkflink-kubernetes-operator-autoscaler` autoscaler  as an independent module, it must depend on `flinkflink-kubernetes-operator` operator  module.
  • `flinkflink-kubernetes-operator` operator  cannot depend on `flink flink-kubernetes-operator-autoscaler , so it's more difficult to load these classes than remove the `flinkflink-kubernetes-operator-autoscaler` autoscaler  module.

Public Interfaces

JobAutoScaler

Its interfaces are similar to the current JobAutoScaler, however the method parameters are changed. Changing these parameters from classes related to kubernetes to JobAutoScalerContext<KEY, INFO> and KEY jobKey.


1. JobAutoScaler

The proposed interface retains similarity to the existing JobAutoScaler , although with modified method parameters. Instead of using Kubernetes-related classes, the parameters will be replaced with
JobAutoScalerContext<KEY, INFO>  and KEY jobKey. To enhance clarity, it is suggested to rename the generic parameter KEY  to JOB_KEY . The autoscaler will treat Flink jobs with the same jobKey as identical
It may be more reasonable to change the generic KEY to JOB_KEY, the auto sclaer will consider them as the same flink job when the jobKey is the same.


The generic INFO will be introduced later.

Code Block
/** The general Autoscaler instance. */
public interface JobAutoScaler<KEY, INFO> {

    /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */
    boolean scale(JobAutoScalerContext<KEY, INFO> context);

    /** Called when the custom resource is deleted. */
    void cleanup(KEY jobKey);
}

2. JobAutoScalerContext

...

The JobAutoScalerContext  encapsulates essential information required for scaling Flink jobs, including jobKey, jobId, stateStore

...

, and INFO extraJobInfo.
Currently, in the existing code or for Kubernetes jobs, the jobKey is

...

defined as io.javaoperatorsdk.operator.processing.event.

...

ResourceID .
However, there is a possibility to define the jobKey

...

for Flink jobs running on YARN in the future.

...

Regarding the

...

INFO extraJobInfo, it is worth noting that the flink-autoscaler

...

itself does not utilize this information. Instead, it is

...

employed by certain implementations of the AutoScalerHandler.
The entire JobAutoScalerContext, comprising all relevant details, will be passed to these

...

implementations when the autoscaler invokes their respective callbacks

...

.


Code Block
/**
 * The job autoscaler context.
 *
 * @param <KEY>
 * @param <INFO>
 */
@AllArgsConstructor
public class JobAutoScalerContext<KEY, INFO> {

    // The identifier of each flink job.
    @Getter private final KEY jobKey;

    @Getter private final JobID jobID;

    @Getter private final long jobVersion;

    // Whether the job is really running, the STARTING or CANCELING aren't running.
    @Getter private final boolean isRunning;

    @Getter private final Configuration conf;

    @Getter private final MetricGroup metricGroup;

    private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

    @Getter private final Duration flinkClientTimeout;

    @Getter private final AutoScalerStateStore stateStore;

    /**
     * The flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements of AutoScalerHandler. This
     * whole context will be passed to these implements when the autoscaler callbacks them.
     */
    @Getter @Nullable private final INFO extraJobInfo;

    public RestClusterClient<String> getRestClusterClient() throws Exception {
        return restClientSupplier.get();
    }
}


3. AutoScalerHandler

AutoScalerHandler will be called by auto scaler when some cases need to be handle, such as: scaling error, report scaling result and update flink job based on the recommended parallelism.

For current code or kubernetes job, most of handlers will record event, all of handlers needs the `AbstractFlinkResource<?, ?>`, it's saved at `INFO extraJobInfo` of JobAutoScalerContext.
The `AutoScalerHandler` object is shared for all flink jobs, it doesn't have the job information. However, it needs the `AbstractFlinkResource<?, ?>` of every job, that's why adding the `INFO extraJobInfo` to the JobAutoScalerContext.

...

Code Block
/**
 * Handler all events during scaling.
 *
 * @param <KEY>
 * @param <INFO>
 */
public interface AutoScalerHandler<KEY, INFO> {

    void handlerScalingError(JobAutoScalerContext<KEY, INFO> context, String errorMessage);

    void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessage);

    void handlerIneffectiveScaling(JobAutoScalerContext<KEY, INFO> context, String message);

    void handlerRecommendedParallelism(
            JobAutoScalerContext<KEY, INFO> context,
            HashMap<String, String> recommendedParallelism);
}


4. AutoScalerStateStore

AutoScalerStateStore is responsible for persist and access state during scaling.

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a `HeapedAutoScalerStateStore`, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.

...

Code Block
public interface AutoScalerStateStore {

    Optional<String> get(String key);

    // Put the state to state store, please flush the state store to prevent the state lost.
    void put(String key, String value);

    void remove(String key);

    void flush();
}



Proposed Changes

  1. Ensure

...

  1. new autoscaler module

...

  1. keeps the general auto scaling strategy.
    It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo

...

  1. , ScalingExecutor

...

  1.   etc.

...

  1. kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , AutoScalerHandler  and AutoScalerStateStore  instead.

  2. Using the

...

  1. `RestClusterClient<String>` instead of org.apache.flink.kubernetes.operator.service.FlinkService 

    The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


  2. Implement the default and kubernetes classes for AutoScalerHandler
    The default AutoScalerHandler could be the LoggedAutoScalerHandler. It just log the event when any method is called.



For current code or in kubernetes job, most of handlers will record event, all of handlers needs the `AbstractFlinkResource<?, ?>`, it's saved at `INFO extraJobInfo` of JobAutoScalerContext.
The `AutoScalerHandler` AutoScalerHandler  object is shared for all flink jobs, it doesn't have the job information. However, it needs the `AbstractFlinkResource<?, ?>` of every job, that's why adding the `INFO extraJobInfo` INFO extraJobInfo  to the JobAutoScalerContext. 

...

  • Adding the AutoScalerHandler into the JobAutoScalerContext
  • And adding the JobAutoScalerContext  into the AutoScalerHandler
  • All hander methods don't need the JobAutoScalerContext, because it has includes the JobAutoScalerContext

Implement the default and kubernetes classes for AutoScalerStateStore

The default AutoScalerStateStore is the HeapedAutoScalerStateStore, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.

It's pluggable, so any database or state store can be supporttedsupported. Flink users can implement other AutoScalerStateStore in their scenarios.

...

Code Block
/** The kubernetes auto scaler state store. */
public class KubernetesAutoScalerStateStore implements AutoScalerStateStore {

    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";

    private final KubernetesClient kubernetesClient;

    private final ConfigMap configMap;

    public KubernetesAutoScalerStateStore(
            AbstractFlinkResource<?, ?> cr, KubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
        this.configMap = getConfigMap(cr, kubernetesClient);
    }

    public static ConfigMap getConfigMap(
            AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {

        var objectMeta = new ObjectMeta();
        objectMeta.setName("autoscaler-" + cr.getMetadata().getName());
        objectMeta.setNamespace(cr.getMetadata().getNamespace());

        return getScalingInfoConfigMapFromKube(objectMeta, kubeClient)
                .orElseGet(
                        () -> {
                            LOG.info("Creating scaling info config map");

                            objectMeta.setLabels(
                                    Map.of(
                                            Constants.LABEL_COMPONENT_KEY,
                                            LABEL_COMPONENT_AUTOSCALER,
                                            Constants.LABEL_APP_KEY,
                                            cr.getMetadata().getName()));
                            var cm = new ConfigMap();
                            cm.setMetadata(objectMeta);
                            cm.addOwnerReference(cr);
                            cm.setData(new HashMap<>());
                            return kubeClient.resource(cm).create();
                        });
    }

    private static Optional<ConfigMap> getScalingInfoConfigMapFromKube(
            ObjectMeta objectMeta, KubernetesClient kubeClient) {
        return Optional.ofNullable(
                kubeClient
                        .configMaps()
                        .inNamespace(objectMeta.getNamespace())
                        .withName(objectMeta.getName())
                        .get());
    }

    @Override
    public Optional<String> get(String key) {
        return Optional.ofNullable(configMap.getData().get(key));
    }

    @Override
    public void put(String key, String value) {
        configMap.getData().put(key, value);
    }

    @Override
    public void remove(String key) {
        configMap.getData().remove(key);
    }

    @Override
    public void flush() {
        kubernetesClient.resource(configMap).update();
    }
}



POC

I have finished the POC for FLIP-334, here is the POC branch. This branch has 3 commits:

...

The flink-autoscaler module doesn't depend on any kubernetes dependencies in this POC. I have tested, it works well with kubernetes.
You can run the flink-kubernetes-operator locally based on the flink-kubernetes-operator-docs and run the autoscaling example.

BTW, I didn't support yarn in this POC, Samrat Deb  would like to support it after decoupling flink-autoscaler and kubernetes.

Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.


Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

Rejected Alternatives