Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Update to make the doc more crisp and coinsice


Page properties


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/kmm03gls1vw4x6vk1ypr9ny9q9522495
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32723

Release



[** This FLIP proposal is a joint work collaborative effort between  Rui Fan and Samrat Deb  , with valuable input and did some consulting with provided by Gyula Fora and Maximilian Michels  ] **


Table of Contents

1. Motivation

The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated coupled with Kubernetes and resides within the flink-kubernetes-operator repository.

...

  1. JobAutoScaler and JobAutoScalerImpl: These components will define the generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.

  2. Interfaces: The FLIP outlines the necessity of defining a few interfaces - ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore.
    1. The ScalingRealizer interface handles scaling action.
    2. The AutoScalerEventHandler  interface handles loggable events.
    3. The AutoScalerStateStore  interface is responsible for accessing and persisting the autoscaler's state.

  3. Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as fabric8, flinkbe agnostic to any specific deployment framework like Kubernetes, YARN etc. Currently, core autoscaling framework closely coupled with Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes. 
    InsteadIdeally, it can should rely on Apache Flink project dependencies to gather metrics and make scaling decisions based on JobVertexID , Configuration , MetricGroup , and other relevant classes.

  4. Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).

        a. Since Due to the autoscaler not 's compatibility with only supports the latest flink Flink version, so moving it to the Flink repository may not be a good choice moving it to flink repo. It needs to work across Flink versions this probably have to be a separate repothe most suitable option.
            It would need to function across various Flink versions, potentially necessitating a separate repository/subproject. 

        b. Remove Removing the autoscaler from operator repo will also bring a lot of operational overhead / release process etc which will be a negative from the operator’s perspective.the flink-kubernetes-operator repository would introduce significant operational overhead and complicate the release process,
            which would be viewed negatively from the operator's perspective.

    Therefore, considering above two points preference is to retain core autoscaler framework/logic within the flink-kubernetes-operator as a submodule     c. So, we prefer stay it and flink-yarn-autoscaler as a submodule of the flink-kubernetes-operator in the short term. If it's Should it prove to be stable in the future,
    we can discuss move them. After it's stablerevisit the idea of relocating it. As it stabilizes, the release frequency will be reduceddecrease, and it will be small negative for this transition will have a minor impact on the operator.


Note:
Independent flink-kubernetes-operator-autoscaler  module is not necessary. Moving classes to flink-kubernetes-operator  will reduce complexity.

Why it isn flink-kubernetes-operator-autoscaler as seperate module isn't necessary?

...

     If flink-kubernetes-operator-autoscaler  as an independent module, it must depend on flink-kubernetes-operator  module.

...

flink-kubernetes-operator  cannot depend on
     flink-kubernetes-operator-autoscaler , so it's more difficult to load these classes than remove the flink-kubernetes-operator-autoscaler  module.

...

flink-kubernetes-operator-autoscaler 

...


     just defines KubernetesScalingRealizer, KubernetesAutoScalerEventHandler and KubernetesAutoScalerStateStore.

...

     There are not many of these classes, so moving them to flink-kubernetes-operator  will reduce complexity. 

3. Public Interfaces

3.1 JobAutoScaler

The proposed interface retains similarity maintains a resemblance to the existing JobAutoScaler, although albeit with modified adjustments to its method parameters. Instead In lieu of using employing Kubernetes-related specific classes, the parameters will be replaced substituted with JobAutoScalerContext<KEY> .  The KEY is jobKey, if the KEY is the same, it is considered to be the same flink , rendering it framework-agnostic, suitable for various deployment frameworks beyond Kubernetes.

The 'KEY' in this context corresponds to the 'jobKey', and when two instances share the same 'KEY,' they are regarded as representing the same Flink job.

Code Block
/** The generic Autoscaler instance. */
public interface JobAutoScaler<KEY> {

    boolean scale(JobAutoScalerContext<KEY> context);

    /** Called when the job is deleted. */
    void cleanup(JobAutoScalerContext<KEY> context);

}

3.2 JobAutoScalerContext

The JobAutoScalerContext  encapsulates essential information required plays a pivotal role in consolidating crucial information necessary for scaling Flink jobs, including . It encompasses essential data such as the jobKey, jobId, configuration and , MetricGroup, etcand more.

Currently , in the existing code or for Kubernetes jobsas of now, the jobKey is defined as using io.javaoperatorsdk.operator.processing.event.ResourceID .
However, there is a possibility to define  , as seen in the existing code and for Kubernetes jobs. However, it's important to note that there may be potential future developments where the jobKey for Flink jobs running operating on YARN in the futurecould differ.

The entire JobAutoScalerContext , comprising encompassing all relevant pertinent details, will be passed furnished to these implementations. This comprehensive context will be provided when the autoscaler invokes their respective callbacks.

...

Code Block
/**
 * The job autoscaler context.
 * Encompassing all pertinent details, It provides comprehensive context for autoscaler invokes their respective callbacks.
 * @param <KEY> The job key.
 */
 public interface JobAutoScalerContext<KEY> {
 
    // The identifier of each flink job.
    KEY getJobKey();
 
    JobID getJobID();
  
    Configuration getConfiguration();
 
    MetricGroup getMetricGroup();
 
    RestClusterClient<String> getRestClusterClient() throws Exception;

 }

...

The ScalingRealizer interface handles scaling action, and is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize  method will be called inside of the invoked from JobAutoscaler#scale  function.

Code Block
/**
* The Scaling Realizer. 
* Responsible for managing scaling actions.
* @param <KEY> the job key.
* @param Context Instance of JobAutoScalerContext.
*/
public interface ScalingRealizer<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void realize(Context context,
                Map<String, String> parallelismOverrides);
}

...

3.4 AutoScalerEventHandler

The AutoScalerEventHandler   interface handles is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent  method will be called invoked by the auto scaler when some loggable events there's a need to be handled, such as: scaling error, report scaling result, etc.
The AutoScalerEventHandler  object is shared for all flink jobs, it doesn't have the job information, that's why the JobAutoScalerContext as the parameter of handleEventhandle such events. These events might include scaling errors, reporting scaling results, and more.

It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent  method, allowing it to access the necessary job-related details when handling events.


Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY>
 */
public interface AutoScalerEventHandler<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void handleEvent(Context context,
                    Type type,
                    String reason,
                    String message,
                    @Nullable String messageKey);

   enum Type {
       Normal,
       Warning
   }
}


3.5 AutoScalerStateStore

The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.

In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap . Consequently, the KubernetesAutoScalerStateStore  is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.

However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation HeapedAutoScalerStateStore . This means that the state information is stored in memory and can be lost if the autoscaler restarts. It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore  etc  to ensure persistent storage of the stateAutoScalerStateStore  is responsible for persist and access state during scaling.
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore  needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a HeapedAutoScalerStateStore , it means the state will be lost after autoscaler restart. Of course, we can implement JdbcAutoScalerStateStore  to persist the store in the future.


Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void storeScalingHistory(Context jobContext, String decision);

   String getScalingHistory(Context jobContext);

   void removeScalingHistory(Context jobContext);

   void storeEvaluatedMetrics(Context jobContext, String metrics);

   String getEvaluatedMetrics(Context jobContext);

   void removeEvaluatedMetrics(Context jobContext);

   void storeParallelismOverrides(Context jobContext, String parallelismOverrides);

   String getParallelismOverrides(Context jobContext);

   void removeParallelismOverrides(Context jobContext);


   void removeInfoFromCache(Context jobContext);

   // The flush is needed because we just save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance 
   void flush(Context jobContext);
}

...

As discussed with Gyula Fora  and Samrat Deb , we don't implement the   yarn autoscaler at implementation will be out of scope for this FLIP, we can just implement . we will develop a generic autoscaler based on the rescale apiAPI (FLIP-291). The This generic autoscaler doesn't know any jobwill not have knowledge of specific jobs, and users can will have the flexibility to pass the JobAutoScalerContext to using when utilizing the generic autoscaler. It can communicate with flink job through Communication with Flink jobs can be achieved through the RestClusterClient. 


4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

...