Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhttps://lists.apache.org/thread/kmm03gls1vw4x6vk1ypr9ny9q9522495
Vote threadhttps://lists.apache.org/thread/3wmhhqgkkg1l7ncxnzwqnjqyhqz545gl
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32723

Release

[This FLIP proposal is a joint work between  Rui Fan and Samrat Deb  , and did some consulting with Gyula Fora ]

Table of Contents

Motivation

flink-kubernetes-operator 1.7.0



Table of Contents

1. Motivation

The proposal to introduce The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated coupled with Kubernetes and resides within the flink-kubernetes-operator repository.

...

Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.

2. Core Idea

  1. JobAutoScaler and JobAutoScalerImpl: These components will define the general generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.

  2. Interfaces: The FLIP outlines the necessity of defining a few interfaces - ScalingRealizer, AutoScalerEventHandler, AutoScalerStateStore and AutoScalerStateStoreFactory and AutoScalerStateStore.
    1. The ScalingRealizer interface handles scaling action.
    2. The AutoScalerEventHandler  interface handles
    event-based operations, while the
    1. loggable events.
    2. The AutoScalerStateStore  interface is responsible for accessing and persisting the autoscaler's state.

  3. Dependencies: The autoscaler module should not rely on any Kubernetesbe agnostic to any specific deployment framework like Kubernetes, YARN etc. Currently, core autoscaling framework closely coupled with Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes. 
    InsteadIdeally, it can should rely on Apache Flink project dependencies to gather metrics and make scaling decisions based on JobVertexID , Configuration , MetricGroup , and other relevant classes.

  4. Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).

        a. Since the autoscaler is not only supports compatible with the latest flink Flink version, so moving it to the Flink repository may not be a good choice moving it to flink repo. It needs to work across Flink versions this probably have to be a separate repothe most suitable option.
            It would need to function across various Flink versions, potentially necessitating a separate repository/subproject. 

        b. Remove Removing the autoscaler from operator repo will also bring a lot of operational overhead / release process etc which will be a negative from the operator’s perspective.the flink-kubernetes-operator repository would introduce significant operational overhead and complicate the release process,
            which would be viewed negatively from the operator's perspective.

    Therefore, considering above two points preference is to retain core autoscaler framework/logic within the flink-kubernetes-operator as a submodule in the short term. Should it prove to be     c. So, we prefer stay it and flink-yarn-autoscaler as a submodule of the flink-kubernetes-operator in the short term. If it's stable in the future,
    we can discuss move them. After it's stablerevisit the idea of relocating it. As it stabilizes, the release frequency will be reduceddecrease, and it will be small negative for this transition will have a minor impact on the operator.


Note:
Independent flink-kubernetes-operator-autoscaler  module is not necessary. Moving classes to flink-kubernetes-operator  will reduce complexity. We can discuss it in the mail list.

Why it isn't necessary?

...

flink-kubernetes-operator-autoscaler

...

as seperate module isn't necessary?
     If flink-kubernetes-operator-autoscaler  as an independent module, it must depend on flink-kubernetes-operator  module.

...

flink-kubernetes-operator  cannot depend on
     flink-kubernetes-operator-autoscaler , so it's more difficult to load these classes than remove the flink-kubernetes-operator-autoscaler  module.

...

flink-kubernetes-operator-autoscaler 
     just defines KubernetesScalingRealizer, KubernetesAutoScalerEventHandler and KubernetesAutoScalerStateStore.
     There are not many of these classes, so moving them to flink-kubernetes-operator  will reduce complexity. 

3. Public Interfaces

3.1 JobAutoScaler

The proposed interface maintains a resemblance to the existing JobAutoScaler, albeit with adjustments to its method parameters. In lieu of employing Kubernetes-specific

Public Interfaces

1. JobAutoScaler

The proposed interface retains similarity to the existing JobAutoScaler , although with modified method parameters. Instead of using Kubernetes-related classes, the parameters will be replaced substituted with JobAutoScalerContext<KEY, INFO> . To enhance clarity, it is suggested to rename the generic parameter KEY  to JOB_KEY . The autoscaler will treat Flink jobs with the same jobKey as identical.The generic INFO will be introduced later JobAutoScalerContext<KEY> , rendering it framework-agnostic, suitable for various deployment frameworks beyond Kubernetes.

The 'KEY' in this context corresponds to the 'jobKey', and when two instances share the same 'KEY,' they are regarded as representing the same Flink job.

Code Block
/**
 * The generalgeneric Autoscaler instance.
 */
public  * @param <KEY> The job key.
 */
@Internal
public interface JobAutoScaler<KEY, INFO>Context extends JobAutoScalerContext<KEY>> {

    /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */
    booleanvoid scale(JobAutoScalerContext<KEY, INFO>Context context) throws Exception;

    /** Called when the job is deleted. */
    void cleanup(JobAutoScalerContext<KEY, INFO> context);

    /** Get the current parallelism overrides for the job. */
    Map<String, String> getParallelismOverrides(JobAutoScalerContext<KEY, INFO> context);
}

...

KEY key);
}

3.2 JobAutoScalerContext

The JobAutoScalerContext  encapsulates essential information required plays a pivotal role in consolidating crucial information necessary for scaling Flink jobs, including . It encompasses essential data such as the jobKey, jobId, stateStoreconfiguration, MetricGroup, and INFO extraJobInfomore.

Currently as of now, in the existing code or for Kubernetes jobs, the jobKey is defined as using io.javaoperatorsdk.operator.processing.event.ResourceID .
However, there is a possibility to define  , as seen in the existing code and for Kubernetes jobs. However, it's important to note that there may be potential future developments where the jobKey for Flink jobs running operating on YARN in the futurecould differ.

Regarding the INFO extraJobInfo, it is worth noting that the flink-autoscaler itself does not utilize this information. Instead, it is employed by certain implementations of the AutoScalerEventHandler.
The entire JobAutoScalerContext, comprising all relevant The JobAutoScalerContext , encompassing all pertinent details, will be passed furnished to these implementations. This comprehensive context will be provided when the autoscaler invokes their respective callbacks.

...

Code Block
/**
 * The job autoscaler context, it includes all details related to the current job.
 *
 * @param <KEY>
 *The @paramjob <INFO>key.
 */
@Experimental
@AllArgsConstructor
@ToString
public class JobAutoScalerContext<KEY,JobAutoScalerContext<KEY> INFO> {

    //** The identifier of each flink job. */
    @Getter private final KEY jobKey;

    @Getter/** privateThe finaljobId JobID jobID;

    // Whether and jobStatus can be null when the job is really running, the STARTING or CANCELING arenisn't runningstarted. */
    @Nullable @Getter private final booleanJobID isRunningjobID;

    @Nullable @Getter private final ConfigurationJobStatus confjobStatus;

    @Getter private final MetricGroupConfiguration metricGroupconfiguration;

    @Getter private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplierMetricGroup metricGroup;

    @Getter private final Duration flinkClientTimeout;
@ToString.Exclude
    private final AutoScalerStateStoreFactory stateStoreFactorySupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

    /**
public RestClusterClient<String> getRestClusterClient() throws Exception *{
 The flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements. This
     * whole context will be passed to these implements when the autoscaler callbacks them.
     */
    @Getter @Nullable private final INFO extraJobInfo;return restClientSupplier.get();
    }
}

3.3 ScalingRealizer

The ScalingRealizer interface is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize  method will be invoked from JobAutoscaler#scale  function.

Code Block
/**
 * The Scaling Realizer is responsible for managing scaling actions.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface ScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>> {

    public/** RestClusterClient<String> getRestClusterClient() throws Exception {
        return restClientSupplier.get();
    }
Update job's parallelism to parallelismOverrides. */
    publicvoid Optional<AutoScalerStateStore> getStateStorerealize()Context {
context, Map<String,       return stateStoreFactory.get(String> parallelismOverrides);
    }

    public AutoScalerStateStore getOrCreateStateStore() {
        return stateStoreFactory.getOrCreate();
    }
}

3. AutoScalerEventHandler

}


3.4 AutoScalerEventHandler

The AutoScalerEventHandler interface is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent  method will be invoked by the auto scaler when there's a need to handle such events. These events might include scaling errors, reporting scaling results, and more.

It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent  method, allowing it to access the necessary job-related details when handling eventsAutoScalerEventHandler  will be called by auto scaler when some cases need to be handle, such as: scaling error, report scaling result and update flink job based on the recommended parallelism.
For current code or kubernetes job, most of handlers will record event, all of handlers needs the AbstractFlinkResource , it's saved at INFO extraJobInfo  of JobAutoScalerContext .
The AutoScalerEventHandler  object is shared for all flink jobs, it doesn't have the job information. However, it needs the AbstractFlinkResource  of every job, that's why adding the INFO extraJobInfo  to the JobAutoScalerContext .


Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY> The job key.
 * @param <INFO> <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends INFO>JobAutoScalerContext<KEY>> {

    void handlerScalingFailure(/**
     * Handle the event.
    JobAutoScalerContext<KEY, INFO> context, *
     * @param interval When interval is great FailureReasonthan failureReason0,
 events that repeat within the interval will be
     String* errorMessage);

    void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessage);
ignored.
     */
    void handlerRecommendedParallelismhandleEvent(
            JobAutoScalerContext<KEY,Context INFO> context,
 Map<String, String> recommendedParallelism);

    /** The reason of autoscaler failure. */
 Type type,
   enum FailureReason {
       String ScalingException(true)reason,
        IneffectiveScaling(false);

    String message,
   // true indicates that the current reason is an unexpected@Nullable error. False indicates that the
String messageKey,
            //@Nullable current reason is that the strategy causes this scaling to fail.Duration interval);

    /** The type of the events. */
    enum    private final boolean isError;
Type {
        FailureReason(boolean isError) {Normal,
        Warning
    this.isError = isError;
        }

        public boolean isError() {
            return isError;
        }
    }
}

We can implement the CompositeEventHandler to support multiple event handles, it can records these events to multiple system.

4. AutoScalerStateStore

...

}
}


3.5 AutoScalerStateStore

The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.

In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap . Consequently, the KubernetesAutoScalerStateStore  is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.

However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation InMemoryAutoScalerStateStore . It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore  etc  to ensure persistent storage of the state.


These method parameters of AutoScalerStateStore to the specific class instead of String, such as: Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory. So, all state store stores are responsible for the serialization, deserialization and state store.


Code Block
/**
 * The state store is responsible for storing all state during scaling.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore {

    Optional<String> get(String key);

    // Put the state to state store, please flush the state store to prevent the state lost.
    void put(String key, String value);

    void removestoreScalingHistory(String key);

    void flush();

    /**
     *Context ThejobContext, stateMap<JobVertexID, storeSortedMap<Instant, cannotScalingSummary>> bescalingHistory)
 used if the state store isn't valid. Please create a new state
     * store by {@link AutoScalerStateStoreFactory}.
     */throws Exception;

    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory(
            Context jobContext) throws Exception;

    booleanvoid isValidremoveScalingHistory(Context jobContext) throws Exception;
}

5. AutoScalerStateStoreFactory

AutoScalerStateStoreFactory  is responsible for get or create the AutoScalerStateStore for each job.

Currently, the AutoScalerStateStore is maintained by AutoscalerInfoManager to reduce the access with kubernetes. If the state store isn't vaild, AutoscalerInfoManager will get or create a new AutoScalerStateStore.

AutoScalerStateStoreFactory will retrieve the state from physical state store(such as: kuberntes configmap) and create the AutoScalerStateStore.

Code Block
/** The factory for auto scaler state store. */
public interface AutoScalerStateStoreFactory {

    Optional<AutoScalerStateStore> get();

    AutoScalerStateStore getOrCreate();
}

6. The general autoscaler

As discussed with Gyula Fora  and Samrat Deb , we don't implement the yarn autoscaler at this FLIP, we can just implement a general autoscaler based on rescale api(FLIP-291). The general autoscaler doesn't know any job, and users can pass the JobAutoScalerContext to using the general autoscaler. It can communicate with flink job through RestClusterClient.

Proposed Changes

...

The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.

...



    void storeEvaluatedMetrics(
            Context jobContext, SortedMap<Instant, CollectedMetrics> evaluatedMetrics)
            throws Exception;

    Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context jobContext)
            throws Exception;

    void removeEvaluatedMetrics(Context jobContext) throws Exception;

    void storeParallelismOverrides(Context jobContext, Map<String, String> parallelismOverrides)
            throws Exception;

    Optional<Map<String, String>> getParallelismOverrides(Context jobContext) throws Exception;

    void removeParallelismOverrides(Context jobContext) throws Exception;

    /**
     * Flushing is needed because we just save data in cache for all store methods. For less write
     * operations, we flush the cached data to the physical storage only after all operations have
     * been performed.
     */
    void flush(Context jobContext) throws Exception;

    /** Clean up all information related to the current job. */
    void removeInfoFromCache(KEY jobKey);
}


4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , ScalingRealizer , AutoScalerEventHandler  and AutoScalerStateStore  instead.

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
/** An implementation of JobAutoscalerContext for Kubernetes. */
public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> {

    private final AbstractFlinkResource<?, ?> resource;

    private final KubernetesClient kubernetesClient;

    public KubernetesJobAutoScalerContext
Code Block
/** The kubernetes auto scaler event handler. */
public class KubernetesAutoScalerEventHandler<CR extends AbstractFlinkResource<?, ?>>
        implements AutoScalerEventHandler<ResourceID, CR> {

    private static final Logger LOG =
            LoggerFactory.getLogger(KubernetesAutoScalerEventHandler.class);

    private final KubernetesClient kubernetesClient;

    private final EventRecorder eventRecorder;

    public KubernetesAutoScalerEventHandler(
            KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.eventRecorder = eventRecorder;
    }

    @Override
    public void handlerScalingFailure(
            JobAutoScalerContext<ResourceID, CR> context,
            FailureReason failureReason,
            String errorMessage) {
        eventRecorder.triggerEvent(
                context.getExtraJobInfo(),
                failureReason.isError() ? EventRecorder.Type.Warning : EventRecorder.Type.Normal,
                failureReason.toString(),
                errorMessage,
                EventRecorder.Component.Operator);
    }

    @Override
    public void handlerScalingReport(
            JobAutoScalerContext<ResourceID,@Nullable CR>JobID contextjobID,
 String scalingReportMessage) {
        eventRecorder.triggerEvent(
 @Nullable JobStatus jobStatus,
            Configuration context.getExtraJobInfo()configuration,
            MetricGroup metricGroup,
   EventRecorder.Type.Normal,
         SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
     EventRecorder.Reason.ScalingReport,
       AbstractFlinkResource<?, ?> resource,
       EventRecorder.Component.Operator,
     KubernetesClient kubernetesClient) {
         scalingReportMessage,super(
                "ScalingExecutor");ResourceID.fromResource(resource),
    }

    @Override
    public   void handlerRecommendedParallelism(jobID,
              JobAutoScalerContext<ResourceID, CR> context,
  jobStatus,
                configuration,
              Map<String, String> recommendedParallelism) {}
}

Rejected alternative implementation(It can reduce method parameters.): Create an AutoScalerEventHandler for the current job each time JobAutoScaler#scale is called, it means we change the AutoScalerEventHandler to a finer granularity. If so, we can:

  • Adding the AutoScalerEventHandler into the JobAutoScalerContext
  • And adding the JobAutoScalerContext  into the AutoScalerEventHandler
  • All hander methods don't need the JobAutoScalerContext, because it has includes the JobAutoScalerContext

Implement the default and kubernetes classes for AutoScalerStateStore

The default AutoScalerStateStore is the HeapedAutoScalerStateStore, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.

It's pluggable, so any database or state store can be supported. Flink users can implement other AutoScalerStateStore in their scenarios.

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

  metricGroup,
                restClientSupplier);
        this.resource = resource;
        this.kubernetesClient = kubernetesClient;
    }

    public AbstractFlinkResource<?, ?> getResource() {
        return resource;
    }

    public KubernetesClient getKubernetesClient() {
        return kubernetesClient;
    }
}


4.3 KubernetesScalingRealizer

Code Block
/** The Kubernetes implementation for applying parallelism overrides. */
public class KubernetesScalingRealizer
        implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {

    @Override
    public void realize(
            KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) {
        context.getResource()
                .getSpec()
        
Code Block
/** The kubernetes auto scaler state store, it's based on the config map. */
public class KubernetesAutoScalerStateStore implements AutoScalerStateStore {

    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

    private final KubernetesClient kubernetesClient;

    private ConfigMap configMap;

    public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient, ConfigMap configMap) {
        this.kubernetesClientgetFlinkConfiguration()
 = kubernetesClient;
        this.configMap = configMap;
    }

.put(
       @Override
    public Optional<String> get(String key) {
        return Optional.ofNullable(configMap.getData().get(key));PipelineOptions.PARALLELISM_OVERRIDES.key(),
    }

    @Override
    public void put(String key, String value) {
        configMapConfigurationUtils.getData().put(key, valueconvertValue(parallelismOverrides, String.class));
    }

    @Override
    public void remove(String key) {
  }


4.4 KubernetesAutoScalerEventHandler

Code Block
/** An event handler which posts events to the Kubernetes events API. */
public class KubernetesAutoScalerEventHandler
      configMap.getData().remove(key);
  implements AutoScalerEventHandler<ResourceID, }KubernetesJobAutoScalerContext> {

    @Overrideprivate final EventRecorder eventRecorder;

    public void flush(KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
        try {this.eventRecorder = eventRecorder;
    }

    @Override
    configMappublic =void kubernetesClient.resource(configMap).update();handleEvent(
        } catch  (Exception e)KubernetesJobAutoScalerContext {context,
            LOG.error(Type type,
            String reason,
       "Error  while updating autoscaler infoString configmapmessage,
  invalidating to clear the cache",
      @Nullable String messageKey,
            e);
@Nullable Duration interval) {
        if configMap(interval == null;) {
            throw e;
eventRecorder.triggerEvent(
                    }
context.getResource(),
      }

        @Override
    public boolean isValid() {
 EventRecorder.Type.valueOf(type.name()),
                 return configMap != null;
    }
}

POC

I have finished the POC for FLIP-334, here is the POC branch. This branch has 3 commits:

  • The first commit: Create the flink-autoscaler module, and move non-kubernetes related autoscaler classes to flink-autoscaler module.
  • The second commit: Add the general interface for autoscaler.
  • The third commit: Remove some test classes of autoscaler due to they depend on k8s, I didn't support the unit test in this POC.(Don't worry, for the final PR, I will support all tests)
  • The last commit: Decoupling the autosclaer and kubernetes.

The flink-autoscaler module doesn't depend on any kubernetes dependencies in this POC. I have tested, it works well with kubernetes.
You can run the flink-kubernetes-operator locally based on the flink-kubernetes-operator-docs and run the autoscaling example.

BTW, I didn't support yarn in this POC, Samrat Deb  would like to support it after this FLIP.

Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.

Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

reason,
                    message,
                    EventRecorder.Component.Operator,
                    messageKey,
                    context.getKubernetesClient());
        } else {
            eventRecorder.triggerEventByInterval(
                    context.getResource(),
                    EventRecorder.Type.valueOf(type.name()),
                    reason,
                    EventRecorder.Component.Operator,
                    message,
                    messageKey,
                    context.getKubernetesClient(),
                    interval);
        }
    }
}


4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

ConfigMapStore is responsible for put/get/remove seralizedStat.

KubernetesAutoScalerStateStore is responsible for serialize state from raw type to String, and deserialize state from String to raw type, it will access state from ConfigMapStore.

5. Standalone AutoScaler

We will implement the StandaloneAutoScaler at this FLIP, it's the generic autoscaler.

5.1 Separate process, JobListFetcher and control loop

The StandaloneAutoScaler can run as a separate process, the StandaloneAutoscalerEntrypoint is the main class of StandaloneAutoScaler.

The StandaloneAutoScaler is not responsible for job management, so there is no job list. In addition to the interfaces mentioned above, we introduced the JobListFetcher interface for StandaloneAutoScaler. The JobListFetcher will provide the job list and the jobContext of all jobs.

Code Block
/** The JobListFetcher will fetch all job list and the jobContext of all jobs. */
public interface JobListFetcher<KEY> {

    List<JobAutoScalerContext<KEY>> fetch(MetricGroup metricGroup);

}

Control loop

StandaloneAutoscalerEntrypoint call the JobListFetcher to fetch job list, and call the JobAutoscaler#scale  for each job peroidically.

We can define the control-loop-interval  option for StandaloneAutoscalerEntrypoint.


Note: The kubernetes-autoscaler doesn't need the StandaloneAutoScaler and JobListFetcher, it has the job management and control loop.

5.2 The implemetation of JobListFetcher

For flink standalone cluster, we can implement the StandaloneJobListFetcher. User provide the flink standalone cluster address, and StandaloneJobListFetcher will fetch job list via flink rest api.

For yarn cluster, we can implement the YarnJobListFetcher. User provide the yarn address, and YarnJobListFetcher will fetch job list via yarn rest api and flink rest api.

5.3 The implemetation of ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore

  • We will implement the ScalingApiRealizer, it based on the rescale api of FLIP-291.
  • The generic EventHandler based on the logger.
  • The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.
    • We will implement the JdbcAutoscalerStateStore as well.

6. Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.


7. Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

Ensure 100% functionality and test coverage of Kubernetes implementation.

8. Rejected Alternatives

Rejected a couple of interface designs, the the whole solution is fine.

yarn-autoscaler

For first version of yarn-autoscaler, we just support rescale job via the rescale api, and don't support re-deploy a new yarn application. Following the some reasons:

  • If we wanna support the entire yarn deployment, it means we need to implement the yarn job management in the yarn-autoscaler. 

  • Not only rescale, but also start stop keepalive, etc.

  • We need to maintains the job jar, job information, etc.

These parts are too heavy, so we just support the rescale api in the first version.

Note: Each company have a flink platform to manage their flink jobs that running on yarn. The flink platform has jobName, job address, job jar, etc. And the flink platform has the ability of start stop keepalive, so these platforms are easy to implement the ScalingRealizer in their cases after this FLIP.

...