[This FLIP proposal is a joint work between Rui Fan and Samrat Deb ]
Motivation
The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated with Kubernetes and resides within the flink-kubernetes-operator repository.
There are compelling reasons to extend the usage of the flink-autoscaler to Flink jobs running on YARN as well:
With the recent merge of the Externalized Declarative Resource Management (FLIP-291, FLINK-31316), in-place scaling is now supported across all types of Flink jobs. This development has made scaling Flink on YARN a straightforward process.
Several discussions within the Flink user community, as observed in the mail list , have emphasized the necessity of flink-autoscaler supporting Flink on YARN.
Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.
Core Idea
- JobAutoScaler and JobAutoScalerImpl: These components will define the general autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.
- Interfaces: The FLIP outlines the necessity of defining a few interfaces -
AutoScalerEventHandler
,AutoScalerStateStore
andAutoScalerStateStoreFactory
.
TheAutoScalerEventHandler
interface handles event-based operations, while theAutoScalerStateStore
interface is responsible for accessing and persisting the autoscaler's state. - Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes.
Instead, it can rely on Apache Flink project dependencies to gather metrics and make scaling decisions based onJobVertexID
,Configuration
,MetricGroup
, and other relevant classes. - Optional Goal: As a nice-to-have feature, the FLIP proposes moving the flink-autoscaler module to the Apache Flink repository, thereby making it an integral part of the Flink project.
Please note that, Initially autoscaler module will be part of flink-kubernetes-operator repository during this FLIP, and we can move the autoscaler module to apache flink in the last step of this FLIP.
Note:
Independent flink-kubernetes-operator-autoscaler
module is not necessary. Moving classes to flink-kubernetes-operator
will reduce complexity. We can discuss it in the mail list.
Why it isn't necessary?
- In the POC version,
flink-kubernetes-operator-autoscaler
definesKubernetesAutoScalerEventHandler
,KubernetesAutoScalerStateStore
andKubernetesAutoScalerStateStore
Factory. - If
flink-kubernetes-operator-autoscaler
as an independent module, it must depend onflink-kubernetes-operator
module. flink-kubernetes-operator
cannot depend onflink-kubernetes-operator-autoscaler
, so it's more difficult to load these classes than remove theflink-kubernetes-operator-autoscaler
module.
Public Interfaces
1. JobAutoScaler
The proposed interface retains similarity to the existing JobAutoScaler
, although with modified method parameters. Instead of using Kubernetes-related classes, the parameters will be replaced with JobAutoScalerContext<KEY, INFO>
. To enhance clarity, it is suggested to rename the generic parameter KEY
to JOB_KEY
. The autoscaler will treat Flink jobs with the same jobKey as identical.
The generic INFO will be introduced later.
/** The general Autoscaler instance. */ public interface JobAutoScaler<KEY, INFO> { /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */ boolean scale(JobAutoScalerContext<KEY, INFO> context); /** Called when the job is deleted. */ void cleanup(JobAutoScalerContext<KEY, INFO> context); /** Get the current parallelism overrides for the job. */ Map<String, String> getParallelismOverrides(JobAutoScalerContext<KEY, INFO> context); }
2. JobAutoScalerContext
The JobAutoScalerContext
encapsulates essential information required for scaling Flink jobs, including jobKey, jobId, stateStore, and INFO extraJobInfo
.
Currently, in the existing code or for Kubernetes jobs, the jobKey is defined as io.javaoperatorsdk.operator.processing.event.ResourceID
.
However, there is a possibility to define the jobKey for Flink jobs running on YARN in the future.
Regarding the INFO extraJobInfo
, it is worth noting that the flink-autoscaler itself does not utilize this information. Instead, it is employed by certain implementations of the AutoScalerEventHandler.
The entire JobAutoScalerContext, comprising all relevant details, will be passed to these implementations when the autoscaler invokes their respective callbacks.
/** * The job autoscaler context. * * @param <KEY> * @param <INFO> */ @AllArgsConstructor public class JobAutoScalerContext<KEY, INFO> { // The identifier of each flink job. @Getter private final KEY jobKey; @Getter private final JobID jobID; // Whether the job is really running, the STARTING or CANCELING aren't running. @Getter private final boolean isRunning; @Getter private final Configuration conf; @Getter private final MetricGroup metricGroup; private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier; @Getter private final Duration flinkClientTimeout; private final AutoScalerStateStoreFactory stateStoreFactory; /** * The flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements. This * whole context will be passed to these implements when the autoscaler callbacks them. */ @Getter @Nullable private final INFO extraJobInfo; public RestClusterClient<String> getRestClusterClient() throws Exception { return restClientSupplier.get(); } public Optional<AutoScalerStateStore> getStateStore() { return stateStoreFactory.get(); } public AutoScalerStateStore getOrCreateStateStore() { return stateStoreFactory.getOrCreate(); } }
3. AutoScalerEventHandler
AutoScalerEventHandler
will be called by auto scaler when some cases need to be handle, such as: scaling error, report scaling result and update flink job based on the recommended parallelism.
For current code or kubernetes job, most of handlers will record event, all of handlers needs the AbstractFlinkResource
, it's saved at INFO extraJobInfo
of JobAutoScalerContext
.
The AutoScalerEventHandler
object is shared for all flink jobs, it doesn't have the job information. However, it needs the AbstractFlinkResource
of every job, that's why adding the INFO extraJobInfo
to the JobAutoScalerContext
.
/** * Handler all events during scaling. * * @param <KEY> * @param <INFO> */ public interface AutoScalerEventHandler<KEY, INFO> { void handlerScalingFailure( JobAutoScalerContext<KEY, INFO> context, FailureReason failureReason, String errorMessage); void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessage); void handlerRecommendedParallelism( JobAutoScalerContext<KEY, INFO> context, Map<String, String> recommendedParallelism); /** The reason of autoscaler failure. */ enum FailureReason { ScalingException(true), IneffectiveScaling(false); // true indicates that the current reason is an unexpected error. False indicates that the // current reason is that the strategy causes this scaling to fail. private final boolean isError; FailureReason(boolean isError) { this.isError = isError; } public boolean isError() { return isError; } } }
4. AutoScalerStateStore
AutoScalerStateStore
is responsible for persist and access state during scaling.
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore
needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a HeapedAutoScalerStateStore
, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore
to persist the store in the future.
/** It will be used store state during scaling. */ public interface AutoScalerStateStore { Optional<String> get(String key); // Put the state to state store, please flush the state store to prevent the state lost. void put(String key, String value); void remove(String key); void flush(); /** * The state store cannot be used if the state store isn't valid. Please create a new state * store by {@link AutoScalerStateStoreFactory}. */ boolean isValid(); }
Currently, the AutoScalerStateStore
is maintained by AutoscalerInfoManager
to reduce the access with kubernetes. If the state store isn't vaild, AutoscalerInfoManager
will get or create a new AutoScalerStateStore
.
Proposed Changes
- Ensure new autoscaler module keeps the general auto scaling strategy.
It includesJobAutoScalerImpl
,ScalingMetricCollector
,AutoScalerInfo,
ScalingExecutor
etc.
kubernetes related dependencies should be removed from these classes and useJobAutoScalerContext
,AutoScalerHandler
andAutoScalerStateStore
instead. - Using the `
RestClusterClient<String>`
instead oforg.apache.flink.kubernetes.operator.service.FlinkService
The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext. - Implement the default and kubernetes classes for AutoScalerHandler
The default AutoScalerHandler could be the LoggedAutoScalerHandler. It just log the event when any method is called.
For current code in kubernetes, most of handlers will record event, all of handlers needs the AbstractFlinkResource
, it's saved at INFO extraJobInfo
of JobAutoScalerContext
.
The AutoScalerHandler
object is shared for all flink jobs, it doesn't have the job information. However, it needs the AbstractFlinkResource
of every job, that's why adding the INFO extraJobInfo
to the JobAutoScalerContext
.
/** The kubernetes auto scaler event handler. */ public class KubernetesAutoScalerEventHandler<CR extends AbstractFlinkResource<?, ?>> implements AutoScalerEventHandler<ResourceID, CR> { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerEventHandler.class); private final KubernetesClient kubernetesClient; private final EventRecorder eventRecorder; public KubernetesAutoScalerEventHandler( KubernetesClient kubernetesClient, EventRecorder eventRecorder) { this.kubernetesClient = kubernetesClient; this.eventRecorder = eventRecorder; } @Override public void handlerScalingFailure( JobAutoScalerContext<ResourceID, CR> context, FailureReason failureReason, String errorMessage) { eventRecorder.triggerEvent( context.getExtraJobInfo(), failureReason.isError() ? EventRecorder.Type.Warning : EventRecorder.Type.Normal, failureReason.toString(), errorMessage, EventRecorder.Component.Operator); } @Override public void handlerScalingReport( JobAutoScalerContext<ResourceID, CR> context, String scalingReportMessage) { eventRecorder.triggerEvent( context.getExtraJobInfo(), EventRecorder.Type.Normal, EventRecorder.Reason.ScalingReport, EventRecorder.Component.Operator, scalingReportMessage, "ScalingExecutor"); } @Override public void handlerRecommendedParallelism( JobAutoScalerContext<ResourceID, CR> context, Map<String, String> recommendedParallelism) {} }
Alternative implementation: Create an AutoScalerEventHandler for the current job each time JobAutoScaler#scale is called, it means we change the AutoScalerEventHandler to a finer granularity. If so, we can:
- Adding the AutoScalerEventHandler into the JobAutoScalerContext
- And adding the JobAutoScalerContext into the AutoScalerEventHandler
- All hander methods don't need the JobAutoScalerContext, because it has includes the JobAutoScalerContext
Implement the default and kubernetes classes for AutoScalerStateStore
The default AutoScalerStateStore is the HeapedAutoScalerStateStore, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.
It's pluggable, so any database or state store can be supported. Flink users can implement other AutoScalerStateStore in their scenarios.
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
/** The kubernetes auto scaler state store, it's based on the config map. */ public class KubernetesAutoScalerStateStore implements AutoScalerStateStore { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private final KubernetesClient kubernetesClient; private ConfigMap configMap; public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient, ConfigMap configMap) { this.kubernetesClient = kubernetesClient; this.configMap = configMap; } @Override public Optional<String> get(String key) { return Optional.ofNullable(configMap.getData().get(key)); } @Override public void put(String key, String value) { configMap.getData().put(key, value); } @Override public void remove(String key) { configMap.getData().remove(key); } @Override public void flush() { try { configMap = kubernetesClient.resource(configMap).update(); } catch (Exception e) { LOG.error( "Error while updating autoscaler info configmap, invalidating to clear the cache", e); configMap = null; throw e; } } @Override public boolean isValid() { return configMap != null; } }
POC
I have finished the POC for FLIP-334, here is the POC branch. This branch has 3 commits:
- The first commit: Create the flink-autoscaler module, and move non-kubernetes related autoscaler classes to flink-autoscaler module.
- The second commit: Add the general interface for autoscaler.
- The third commit: Remove some test classes of autoscaler due to they depend on k8s, I didn't support the unit test in this POC.(For the final PR, I will do it)
- The last commit: Decoupling the autosclaer and kubernetes.
The flink-autoscaler module doesn't depend on any kubernetes dependencies in this POC. I have tested, it works well with kubernetes.
You can run the flink-kubernetes-operator locally based on the flink-kubernetes-operator-docs and run the autoscaling example.
BTW, I didn't support yarn in this POC, Samrat Deb would like to support it after decoupling flink-autoscaler and kubernetes.
Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.