Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/kmm03gls1vw4x6vk1ypr9ny9q9522495
Vote threadhttps://lists.apache.org/thread/3wmhhqgkkg1l7ncxnzwqnjqyhqz545gl
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32723

Release

...

flink-kubernetes-operator 1.7.0



Table of Contents

1. Motivation

The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly coupled with Kubernetes and resides within the flink-kubernetes-operator repository.

...

Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.

2. Core Idea

  1. JobAutoScaler and JobAutoScalerImpl: These components will define the generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.

  2. Interfaces: The FLIP outlines the necessity of defining a few interfaces - ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore.
    1. The ScalingRealizer interface handles scaling action.
    2. The AutoScalerEventHandler  interface handles loggable events.
    3. The AutoScalerStateStore  interface is responsible for accessing and persisting the autoscaler's state.

  3. Dependencies: The autoscaler module should be agnostic to any specific deployment framework like Kubernetes, YARN etc. Currently, core autoscaling framework closely coupled with Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes. 
    Ideally, it should rely on Apache Flink project dependencies to gather metrics and make scaling decisions based on JobVertexID , Configuration , MetricGroup , and other relevant classes.

  4. Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).

        a. Since the autoscaler is not only compatible with the latest Flink version, moving it to the Flink repository may not be the most suitable option.
            It would need to function across various Flink versions, potentially necessitating a separate repository/subproject. 

        b. Removing the autoscaler from the flink-kubernetes-operator repository would introduce significant operational overhead and complicate the release process,
            which would be viewed negatively from the operator's perspective.

    Therefore, considering above two points preference is to retain core autoscaler framework/logic within the flink-kubernetes-operator as a submodule in the short term. Should it prove to be stable in the future,
    we can revisit the idea of relocating it. As it stabilizes, the release frequency will decrease, and this transition will have a minor impact on the operator.


...

Why flink-kubernetes-operator-autoscaler as seperate module isn't necessary?
     If flink-kubernetes-operator-autoscaler  as an independent module, it must depend on flink-kubernetes-operator  module. flink-kubernetes-operator  cannot depend on
     flink-kubernetes-operator-autoscaler , so it's more difficult to load these classes than remove the flink-kubernetes-operator-autoscaler  module. flink-kubernetes-operator-autoscaler 
     just defines KubernetesScalingRealizer, KubernetesAutoScalerEventHandler and KubernetesAutoScalerStateStore.
     There are not many of these classes, so moving them to flink-kubernetes-operator  will reduce complexity. 

3. Public Interfaces

3.1 JobAutoScaler

The proposed interface maintains a resemblance to the existing JobAutoScaler, albeit with adjustments to its method parameters. In lieu of employing Kubernetes-specific classes, the parameters will be substituted with JobAutoScalerContext<KEY> , rendering it framework-agnostic, suitable for various deployment frameworks beyond Kubernetes.

The 'KEY' in this context corresponds to the 'jobKey', and when two instances share the same 'KEY,' they are regarded as representing the same Flink job.

Code Block
/**
 * The generic Autoscaler instance.
 */
public interface* JobAutoScaler<KEY>@param {

    boolean scale(JobAutoScalerContext<KEY> context)<KEY> The job key.
 */
@Internal
public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {

    /** Called as part of the reconciliation loop. */
    void scale(Context context) throws Exception;

    /** Called when the job is deleted. */
    void cleanup(JobAutoScalerContext<KEY>KEY contextkey);

}

3.2 JobAutoScalerContext

The JobAutoScalerContext  plays a pivotal role in consolidating crucial information necessary for scaling Flink jobs. It encompasses essential data such as the jobKey, jobId, configuration, MetricGroup, and more.

...

Code Block
/**
 * The job autoscaler context.
, *it Encompassingincludes all pertinent details, Itrelated providesto comprehensivethe contextcurrent for autoscaler invokes their respective callbacks.job.
 *
 * @param <KEY> The job key.
 */
@Experimental
@AllArgsConstructor
@ToString
 public interfaceclass JobAutoScalerContext<KEY> {
 
    //** The identifier of each flink job. */
    @Getter private final KEY getJobKey()jobKey;
 
    JobID getJobID();
  
    Configuration getConfiguration();
 
    MetricGroup getMetricGroup();
 
    RestClusterClient<String> getRestClusterClient() throws Exception;

 }

3.3 ScalingRealizer

The ScalingRealizer interface is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize  method will be invoked from JobAutoscaler#scale  function.

Code Block
/**
* The Scaling Realizer. 
* Responsible for managing scaling actions.
* @param <KEY> the job key.
* @param Context Instance of JobAutoScalerContext.
*/
public interface ScalingRealizer<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void realize(Context context,/** The jobId and jobStatus can be null when the job isn't started. */
    @Nullable @Getter private final JobID jobID;

    @Nullable @Getter private final JobStatus jobStatus;

    @Getter private final Configuration configuration;

    @Getter private final MetricGroup metricGroup;

    @ToString.Exclude
    private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

    public RestClusterClient<String> getRestClusterClient() throws Exception {
        return restClientSupplier.get();
       Map<String, String> parallelismOverrides);}
}

3.

...

3 ScalingRealizer

The AutoScalerEventHandler ScalingRealizer interface is responsible for managing loggable eventsscaling actions(upscale or downscale), and specifically, the AutoScalerEventHandler#handleEvent ScalingRealizer#realize  method will be invoked by the auto scaler when there's a need to handle such events. These events might include scaling errors, reporting scaling results, and more.

It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent  method, allowing it to access the necessary job-related details when handling events.

from JobAutoscaler#scale  function.

Code Block
/**
 * The Scaling Realizer is responsible for managing scaling actions.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface ScalingRealizer<KEY, 
Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY>
 */
public interface AutoScalerEventHandler<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void handleEvent(Context context,
      /** Update job's parallelism to parallelismOverrides. */
    void realize(Context context, Map<String,        Type type,
                    String reason,
                    String message,
                    @Nullable String messageKey);

   enum Type {
       Normal,
       Warning
   }
}

3.5 AutoScalerStateStore

The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.

In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap . Consequently, the KubernetesAutoScalerStateStore  is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.

However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation HeapedAutoScalerStateStore . It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore  etc  to ensure persistent storage of the state.

String> parallelismOverrides);
}


3.4 AutoScalerEventHandler

The AutoScalerEventHandler interface is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent  method will be invoked by the auto scaler when there's a need to handle such events. These events might include scaling errors, reporting scaling results, and more.

It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent  method, allowing it to access the necessary job-related details when handling events.


Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface AutoScalerEventHandler<KEY, 
Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void storeScalingHistory(Context jobContext, String decision);

   String getScalingHistory(Context jobContext);
 /**
     * Handle the event.
   void removeScalingHistory(Context jobContext);

 *
    void storeEvaluatedMetrics(Context jobContext, String metrics);

   String getEvaluatedMetrics(Context jobContext);

   void removeEvaluatedMetrics(Context jobContext);

   void storeParallelismOverrides(Context jobContext, String parallelismOverrides);

   String getParallelismOverrides(Context jobContext);

   void removeParallelismOverrides(Context jobContext);


 * @param interval When interval is great than 0, events that repeat within the interval will be
     *     ignored.
     */
    void removeInfoFromCachehandleEvent(Context jobContext);


      // The flush is needed because weContext justcontext,
 save data in cache for all store methods, and flush these dataType totype,
 the physical storage after the flush is called to improve the performanceString reason,
    void flush(Context jobContext);
}

3.6 The generic autoscaler

As discussed with Gyula Fora  and Samrat Deb ,  yarn autoscaler implementation will be out of scope for this FLIP. We will develop a generic autoscaler. This generic autoscaler will not have knowledge of specific jobs, and users will have the flexibility to pass the JobAutoScalerContext when utilizing the generic autoscaler. Communication with Flink jobs can be achieved through the RestClusterClient. 

  • The generic ScalingRealizer based on the rescale API (FLIP-291).
  • The generic EventHandler based on the logger.
  • The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.

4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

...

        String message,
            @Nullable String messageKey,
            @Nullable Duration interval);

    /** The type of the events. */
    enum Type {
        Normal,
        Warning
    }
}


3.5 AutoScalerStateStore

The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.

In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap . Consequently, the KubernetesAutoScalerStateStore  is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.

However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation InMemoryAutoScalerStateStore . It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore  etc  to ensure persistent storage of the state.


These method parameters of AutoScalerStateStore to the specific class instead of String, such as: Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory. So, all state store stores are responsible for the serialization, deserialization and state store.


Code Block
/**
 * The state store is responsible for storing all state during scaling.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {

    void storeScalingHistory(
            Context jobContext, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory)
            throws Exception;

    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory(
    

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.

4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> {

   private final ResourceID resourceID;

   private final JobID jobID;

   private final Configuration configuration;

   private final MetricGroup metricGroup;

   private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

   private final AbstractFlinkResource<?, ?> resource;

   public KubernetesJobAutoScalerContext(
        Context jobContext) throws ResourceID resourceID,
Exception;

    void removeScalingHistory(Context jobContext) throws Exception;

    JobIDvoid jobID,storeEvaluatedMetrics(
            ConfigurationContext configurationjobContext,
 SortedMap<Instant, CollectedMetrics> evaluatedMetrics)
        MetricGroup metricGroup,
   throws Exception;

    Optional<SortedMap<Instant, CollectedMetrics>>  SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
 getEvaluatedMetrics(Context jobContext)
          AbstractFlinkResource<?, ?> resource) {throws Exception;

    void removeEvaluatedMetrics(Context jobContext) this.resourceID = resourceID;throws Exception;

    void storeParallelismOverrides(Context  this.jobID = jobID;jobContext, Map<String, String> parallelismOverrides)
       this.configuration = configuration;
       this.metricGroup = metricGroup;throws Exception;

    Optional<Map<String, String>>  this.restClientSupplier = restClientSupplier;
getParallelismOverrides(Context jobContext) throws Exception;

    void removeParallelismOverrides(Context jobContext) this.resource = resourcethrows Exception;

   }
 /**
   @Override
  * publicFlushing ResourceIDis getJobKey() {
       return resourceID;
   }

   @Overrideneeded because we just save data in cache for all store methods. For less write
   public JobID getJobID() {
       return jobID;
   }

   @Override
   public Configuration getConfiguration() {* operations, we flush the cached data to the physical storage only after all operations have
     * been performed.
     */
  return configuration;
   }

   @Override
   public MetricGroup getMetricGroup() {void flush(Context jobContext) throws Exception;

    /** Clean up returnall metricGroup;
information related to }

the current job. @Override*/
   public RestClusterClient<String>void getRestClusterClientremoveInfoFromCache() throws Exception {
       return restClientSupplier.get();
   }

   public AbstractFlinkResource<?, ?> getResource() {
       return resource;
   }
}

4.3 KubernetesScalingRealizer

Code Block
public class KubernetesScalingRealizer
       implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {
   @Override
   public void realize(
           KubernetesJobAutoScalerContext context,
           Map<String, String> parallelismOverrides) {
       spec.getFlinkConfiguration().put(
       PipelineOptions.PARALLELISM_OVERRIDES.key(),
       ConfigurationUtils.convertValue(parallelismOverrides, String.class));
       context.getResource().getStatus().setImmediateReconciliationNeeded(true);
   }
}

4.4 KubernetesAutoScalerEventHandler

KEY jobKey);
}


4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , ScalingRealizer , AutoScalerEventHandler  and AutoScalerStateStore  instead.

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
/** An implementation of JobAutoscalerContext for Kubernetes. */
public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> {

    private final AbstractFlinkResource<?, ?> resource;

    private final KubernetesClient kubernetesClient;

    public KubernetesJobAutoScalerContext(
            @Nullable JobID jobID
Code Block
public class KubernetesAutoScalerEventHandler
       implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

   private final EventRecorder eventRecorder;

   public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
       this.eventRecorder = eventRecorder;
   }

   @Override
   public void handleEvent(KubernetesJobAutoScalerContext context,
            @Nullable JobStatus jobStatus,
            Configuration Type typeconfiguration,
            MetricGroup metricGroup,
            SupplierWithException<RestClusterClient<String>, Exception> String reasonrestClientSupplier,
            AbstractFlinkResource<?, ?> resource,
            KubernetesClient StringkubernetesClient) message,{
        super(
                ResourceID.fromResource(resource),
      @Nullable String messageKey) {
       eventRecorder.triggerEvent(jobID,
               context.getResource() jobStatus,
                EventRecorder.Type.valueOf(type.name()),
configuration,
                reasonmetricGroup,
                restClientSupplier);
     EventRecorder.Component.Operator,   this.resource = resource;
        this.kubernetesClient = kubernetesClient;
    }

  message,
  public AbstractFlinkResource<?, ?> getResource() {
        return messageKey)resource;
    }

}

4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

    public KubernetesClient getKubernetesClient() {
        return kubernetesClient;
    }
}


4.3 KubernetesScalingRealizer

Code Block
/** The Kubernetes implementation for applying parallelism overrides. */
public class KubernetesScalingRealizer
Code Block
/** The kubernetes auto scaler state store, it's based on the config map. */ 
public class KubernetesAutoScalerStateStore
        implements AutoScalerStateStore<ResourceIDScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {

    private@Override
 static final Logger LOGpublic =void LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

realize(
     private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
 KubernetesJobAutoScalerContext context, privateMap<String, staticString> finalparallelismOverrides) String COLLECTED_METRICS_KEY = "collectedMetrics";
{
      private static final String SCALING_HISTORY_KEY = "scalingHistory";
 context.getResource()
       private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";

   private final KubernetesClient kubernetesClient;

.getSpec()
         private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>.getFlinkConfiguration();

    public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) {
       this.kubernetesClient = kubernetesClient;.put(
   }

   // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY
   @Override
   public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) {
 PipelineOptions.PARALLELISM_OVERRIDES.key(),
       getState(jobContext).put(SCALING_HISTORY_KEY, decision);
   }

   @Override
   public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
    getState(jobContext).get(SCALING_HISTORY_KEY);
   }

   @Override
   public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) {
      getState(jobContextConfigurationUtils.convertValue(parallelismOverrides, String.class));
    }
}


4.4 KubernetesAutoScalerEventHandler

Code Block
/** An event handler which posts events to the Kubernetes events API. */
public class KubernetesAutoScalerEventHandler
        implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

    private final EventRecorder eventRecorder;

    public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
 ).remove(SCALING_HISTORY_KEY);
   }

   private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) {
       return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData();
   }

    @Override
    public void flushhandleEvent(KubernetesJobAutoScalerContext
 jobContext) {
       ConfigMap  configMap = cache.get(jobContext.getJobKey());
KubernetesJobAutoScalerContext context,
        Preconditions.checkState(configMap != null, "The configMap shouldn't be null.");
    Type type,
            tryString {reason,
             cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update());
String message,
            }@Nullable catch (Exception e) {String messageKey,
           LOG.error(
 @Nullable Duration interval) {
        if (interval == null)  {
  "Error while updating autoscaler info configmap, invalidating to clear the cache",
 eventRecorder.triggerEvent(
                    e);context.getResource(),
           removeInfoFromCache(jobContext);         EventRecorder.Type.valueOf(type.name()),
           throw  e;
       }reason,
   }

   @Override
   public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) {
       cache.remove(jobContext.getJobKey());message,
   }

   private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) {
       AbstractFlinkResource<?, ?> cr = jobContext.getResource(); EventRecorder.Component.Operator,
       var meta = createCmObjectMeta(ResourceID.fromResource(cr));
       return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(crmessageKey,
 meta));
   }

   private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) {
       LOGcontext.info("Creating scaling info config map");
getKubernetesClient());
        } else {
 var cm = new ConfigMap();
       cmeventRecorder.setMetadatatriggerEventByInterval(meta);
       cm.addOwnerReference(cr);
       cm.setData(new HashMap<>());
       return kubernetesClientcontext.resourcegetResource(cm).create();,
   }

    private ObjectMeta createCmObjectMeta(ResourceID uid) {
       var objectMeta = new ObjectMeta();
       objectMeta.setName("autoscaler-" + uid.getName()); EventRecorder.Type.valueOf(type.name()),
       uid.getNamespace().ifPresent(objectMeta::setNamespace);
       objectMeta.setLabels(
      reason,
         Map.of(
           EventRecorder.Component.Operator,
              Constants.LABEL_COMPONENT_KEY,
      message,
                 LABEL_COMPONENT_AUTOSCALER,
   messageKey,
                    Constants.LABEL_APP_KEYcontext.getKubernetesClient(),
                       uid.getName()))interval);
       return objectMeta;
   }

   private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) {
       return Optional.ofNullable(
               kubernetesClient
                       .configMaps()
                       .inNamespace(objectMeta.getNamespace())
                       .withName(objectMeta.getName())
                       .get());
   }
}

5. Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.

6. Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

7. Rejected Alternatives

 }
}


4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

ConfigMapStore is responsible for put/get/remove seralizedStat.

KubernetesAutoScalerStateStore is responsible for serialize state from raw type to String, and deserialize state from String to raw type, it will access state from ConfigMapStore.

5. Standalone AutoScaler

We will implement the StandaloneAutoScaler at this FLIP, it's the generic autoscaler.

5.1 Separate process, JobListFetcher and control loop

The StandaloneAutoScaler can run as a separate process, the StandaloneAutoscalerEntrypoint is the main class of StandaloneAutoScaler.

The StandaloneAutoScaler is not responsible for job management, so there is no job list. In addition to the interfaces mentioned above, we introduced the JobListFetcher interface for StandaloneAutoScaler. The JobListFetcher will provide the job list and the jobContext of all jobs.

Code Block
/** The JobListFetcher will fetch all job list and the jobContext of all jobs. */
public interface JobListFetcher<KEY> {

    List<JobAutoScalerContext<KEY>> fetch(MetricGroup metricGroup);

}

Control loop

StandaloneAutoscalerEntrypoint call the JobListFetcher to fetch job list, and call the JobAutoscaler#scale  for each job peroidically.

We can define the control-loop-interval  option for StandaloneAutoscalerEntrypoint.


Note: The kubernetes-autoscaler doesn't need the StandaloneAutoScaler and JobListFetcher, it has the job management and control loop.

5.2 The implemetation of JobListFetcher

For flink standalone cluster, we can implement the StandaloneJobListFetcher. User provide the flink standalone cluster address, and StandaloneJobListFetcher will fetch job list via flink rest api.

For yarn cluster, we can implement the YarnJobListFetcher. User provide the yarn address, and YarnJobListFetcher will fetch job list via yarn rest api and flink rest api.

5.3 The implemetation of ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore

  • We will implement the ScalingApiRealizer, it based on the rescale api of FLIP-291.
  • The generic EventHandler based on the logger.
  • The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.
    • We will implement the JdbcAutoscalerStateStore as well.

6. Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.


7. Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

Ensure 100% functionality and test coverage of Kubernetes implementation.

8. Rejected Alternatives

Rejected a couple of interface designs, the the whole solution is fine.

yarn-autoscaler

For first version of yarn-autoscaler, we just support rescale job via the rescale api, and don't support re-deploy a new yarn application. Following the some reasons:

  • If we wanna support the entire yarn deployment, it means we need to implement the yarn job management in the yarn-autoscaler. 

  • Not only rescale, but also start stop keepalive, etc.

  • We need to maintains the job jar, job information, etc.

These parts are too heavy, so we just support the rescale api in the first version.

Note: Each company have a flink platform to manage their flink jobs that running on yarn. The flink platform has jobName, job address, job jar, etc. And the flink platform has the ability of start stop keepalive, so these platforms are easy to implement the ScalingRealizer in their cases after this FLIPRejected a couple of interface designs, the the whole solution is fine.