Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. JobAutoScaler and JobAutoScalerImpl: These components will define the general autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.

  2. Interfaces: The FLIP outlines the necessity of defining a few interfaces - AutoScalerEventHandler, AutoScalerStateStore and AutoScalerStateStoreFactory .
    The AutoScalerEventHandler  interface handles event-based operations, while the AutoScalerStateStore  interface is responsible for accessing and persisting the autoscaler's state.

  3. Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes.
    Instead, it can rely on Apache Flink project dependencies to gather metrics and make scaling decisions based on JobVertexID , Configuration , MetricGroup , and other relevant classes.

  4. Optional Goal: As a nice-to-have feature, the FLIP proposes moving the For flink-autoscaler module to the Apache Flink repository, thereby making it an integral part of the Flink project.
    Please note that, Initially autoscaler module will be part of flink-kubernetes-operator repository during this FLIP, and we can move the autoscaler module to apache flink in the last step of this FLIP, we prefer stay it at flink-k8s-operator first.

        a. Since the autoscaler not only supports the latest flink version, so it may not be a good choice moving it to flink repo. It needs to work across Flink versions this probably have to be a separate repo/subproject. 

        b. Remove the autoscaler from operator repo will also bring a lot of operational overhead / release process etc which will be a negative from the operator’s perspective.

        c. So, we prefer stay it and flink-yarn-autoscaler as a submodule of the flink-kubernetes-operator in the short term. If it's stable in the future, we can discuss move them. After it's stable, the release frequency will be reduced, and it will be small negative for the operator.


Note:
Independent flink-kubernetes-operator-autoscaler  module is not necessary. Moving classes to flink-kubernetes-operator  will reduce complexity. We can discuss it in the mail list.

...

Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore {

    Optional<String> get(String key);

    // Put the state to state store, please flush the state store to prevent the state lost.
    void put(String key, String value);

    void remove(String key);

    void flush();

    /**

    /**
     * The state store cannot be used if the state store isn't valid. Please create a new state
     * store by {@link AutoScalerStateStoreFactory}.
     */
 The state store cannot be used if the state store isn't valid. Please create a new state
     * store by {@link AutoScalerStateStoreFactory}.boolean isValid();
}


5. AutoScalerStateStoreFactory

AutoScalerStateStoreFactory  is responsible for get or create the AutoScalerStateStore for each job.

Currently, the AutoScalerStateStore is maintained by AutoscalerInfoManager to reduce the access with kubernetes. If the state store isn't vaild, AutoscalerInfoManager will get or create a new AutoScalerStateStore.


AutoScalerStateStoreFactory will retrieve the state from physical state store(such as: kuberntes configmap) and create the AutoScalerStateStore.

Code Block
/** The factory for auto scaler state store. */
public interface AutoScalerStateStoreFactory {

     */Optional<AutoScalerStateStore> get();

    boolean isValid();
}AutoScalerStateStore getOrCreate();
}


6. The general autoscaler

As discussed with Gyula Fora  and Samrat Deb , we don't implement the yarn autoscaler at this FLIP, we can just implement a general autoscaler based on rescale api(FLIP-291). The general autoscaler doesn't know any job, and users can pass the JobAutoScalerContext to using the general autoscaler. It can communicate with flink job through RestClusterClient.Currently, the AutoScalerStateStore is maintained by AutoscalerInfoManager to reduce the access with kubernetes. If the state store isn't vaild, AutoscalerInfoManager will get or create a new AutoScalerStateStore.


Proposed Changes

  1. Ensure new autoscaler module keeps the general auto scaling strategy.
    It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
    kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , AutoScalerHandler  and AutoScalerStateStore  instead.

  2. Using the `RestClusterClient<String>` instead of org.apache.flink.kubernetes.operator.service.FlinkService 

    The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


  3. Implement the default and kubernetes classes for AutoScalerHandler
    The default AutoScalerHandler could be the LoggedAutoScalerHandler. It just log the event when any method is called.

...

BTW, I didn't support yarn in this POC, Samrat Deb  would like to support it after decoupling flink-autoscaler and kubernetesthis FLIP.

Compatibility, Deprecation, and Migration Plan

...