[This FLIP proposal is a joint work between Rui Fan and Samrat Deb , and did some consulting with Gyula Fora Maximilian Michels ]
1. Motivation
The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated with Kubernetes and resides within the flink-kubernetes-operator repository.
There are compelling reasons to extend the use of flink-autoscaler to more types of Flink jobs:
With the recent merge of the Externalized Declarative Resource Management (FLIP-291, FLINK-31316), in-place scaling is now supported across all types of Flink jobs. This development has made scaling Flink on YARN a straightforward process.
Several discussions within the Flink user community, as observed in the mail list , have emphasized the necessity of flink-autoscaler supporting Flink on YARN.
Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.
2. Core Idea
- JobAutoScaler and JobAutoScalerImpl: These components will define the generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.
- Interfaces: The FLIP outlines the necessity of defining a few interfaces -
ScalingRealizer, AutoScalerEventHandler
andAutoScalerStateStore
.The ScalingRealizer
interface handles scaling action.- The
AutoScalerEventHandler
interface handles loggable events. - The
AutoScalerStateStore
interface is responsible for accessing and persisting the autoscaler's state.
- Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes.
Instead, it can rely on Apache Flink project dependencies to gather metrics and make scaling decisions based onJobVertexID
,Configuration
,MetricGroup
, and other relevant classes. - Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).
a. Since the autoscaler not only supports the latest flink version, so it may not be a good choice moving it to flink repo. It needs to work across Flink versions this probably have to be a separate repo/subproject.
b. Remove the autoscaler from operator repo will also bring a lot of operational overhead / release process etc which will be a negative from the operator’s perspective.
c. So, we prefer stay it and flink-yarn-autoscaler as a submodule of the flink-kubernetes-operator in the short term. If it's stable in the future, we can discuss move them. After it's stable, the release frequency will be reduced, and it will be small negative for the operator.
Note:
Independent flink-kubernetes-operator-autoscaler
module is not necessary. Moving classes to flink-kubernetes-operator
will reduce complexity.
Why it isn't necessary?
- If
flink-kubernetes-operator-autoscaler
as an independent module, it must depend onflink-kubernetes-operator
module. flink-kubernetes-operator
cannot depend onflink-kubernetes-operator-autoscaler
, so it's more difficult to load these classes than remove theflink-kubernetes-operator-autoscaler
module.flink-kubernetes-operator-autoscaler
just definesKubernetesScalingRealizer,
KubernetesAutoScalerEventHandler
andKubernetesAutoScalerStateStore
.- There are not many of these classes, so moving them to
flink-kubernetes-operator
will reduce complexity.
3. Public Interfaces
3.1 JobAutoScaler
The proposed interface retains similarity to the existing JobAutoScaler
, although with modified method parameters. Instead of using Kubernetes-related classes, the parameters will be replaced with JobAutoScalerContext<KEY>
. The KEY is jobKey, if the KEY is the same, it is considered to be the same flink job.
/** The generic Autoscaler instance. */ public interface JobAutoScaler<KEY> { boolean scale(JobAutoScalerContext<KEY> context); /** Called when the job is deleted. */ void cleanup(JobAutoScalerContext<KEY> context); }
3.2 JobAutoScalerContext
The JobAutoScalerContext
encapsulates essential information required for scaling Flink jobs, including jobKey, jobId, configuration and MetricGroup, etc.
Currently, in the existing code or for Kubernetes jobs, the jobKey is defined as io.javaoperatorsdk.operator.processing.event.ResourceID
.
However, there is a possibility to define the jobKey for Flink jobs running on YARN in the future.
The entire JobAutoScalerContext, comprising all relevant details, will be passed to these implementations when the autoscaler invokes their respective callbacks.
/** * The job autoscaler context. * * @param <KEY> The job key. */ public interface JobAutoScalerContext<KEY> { // The identifier of each flink job. KEY getJobKey(); JobID getJobID(); Configuration getConfiguration(); MetricGroup getMetricGroup(); RestClusterClient<String> getRestClusterClient() throws Exception; }
3.3 ScalingRealizer
The ScalingRealizer interface handles scaling action, and the ScalingRealizer#realize
will be called inside of the JobAutoscaler#scale
.
public interface ScalingRealizer<KEY, Context extends JobAutoScalerContext<KEY>> { void realize(Context context, Map<String, String> parallelismOverrides); }
3.4 AutoScalerEventHandler
The AutoScalerEventHandler
interface handles loggable events, and AutoScalerEventHandler#handleEvent
will be called by auto scaler when some loggable events need to be handled, such as: scaling error, report scaling result, etc.
The AutoScalerEventHandler
object is shared for all flink jobs, it doesn't have the job information, that's why the JobAutoScalerContext as the parameter of handleEvent.
/** * Handler all loggable events during scaling. * * @param <KEY> */ public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> { void handleEvent(Context context, Type type, String reason, String message, @Nullable String messageKey); enum Type { Normal, Warning } }
3.5 AutoScalerStateStore
AutoScalerStateStore
is responsible for persist and access state during scaling.
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore
needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a HeapedAutoScalerStateStore
, it means the state will be lost after autoscaler restart. Of course, we can implement JdbcAutoScalerStateStore
to persist the store in the future.
/** It will be used store state during scaling. */ public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> { void storeScalingHistory(Context jobContext, String decision); String getScalingHistory(Context jobContext); void removeScalingHistory(Context jobContext); void storeEvaluatedMetrics(Context jobContext, String metrics); String getEvaluatedMetrics(Context jobContext); void removeEvaluatedMetrics(Context jobContext); void storeParallelismOverrides(Context jobContext, String parallelismOverrides); String getParallelismOverrides(Context jobContext); void removeParallelismOverrides(Context jobContext); void removeInfoFromCache(Context jobContext); // The flush is needed because we just save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance void flush(Context jobContext); }
3.6 The generic autoscaler
As discussed with Gyula Fora and Samrat Deb , we don't implement the yarn autoscaler at this FLIP, we can just implement a generic autoscaler based on rescale api(FLIP-291). The generic autoscaler doesn't know any job, and users can pass the JobAutoScalerContext to using the generic autoscaler. It can communicate with flink job through RestClusterClient.
4. Proposed Changes
4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.
It includes JobAutoScalerImpl
, ScalingMetricCollector
, AutoScalerInfo,
ScalingExecutor
etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext
, ScalingRealizer
, AutoScalerEventHandler
and AutoScalerStateStore
instead.
Using the RestClusterClient
instead of org.apache.flink.kubernetes.operator.service.FlinkService
- The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.
4.2 KubernetesJobAutoScalerContext
Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.
These logic should be generic for k8s, yarn and standalone.
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> { private final ResourceID resourceID; private final JobID jobID; private final Configuration configuration; private final MetricGroup metricGroup; private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier; private final AbstractFlinkResource<?, ?> resource; public KubernetesJobAutoScalerContext( ResourceID resourceID, JobID jobID, Configuration configuration, MetricGroup metricGroup, SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier, AbstractFlinkResource<?, ?> resource) { this.resourceID = resourceID; this.jobID = jobID; this.configuration = configuration; this.metricGroup = metricGroup; this.restClientSupplier = restClientSupplier; this.resource = resource; } @Override public ResourceID getJobKey() { return resourceID; } @Override public JobID getJobID() { return jobID; } @Override public Configuration getConfiguration() { return configuration; } @Override public MetricGroup getMetricGroup() { return metricGroup; } @Override public RestClusterClient<String> getRestClusterClient() throws Exception { return restClientSupplier.get(); } public AbstractFlinkResource<?, ?> getResource() { return resource; } }
4.3 KubernetesScalingRealizer
public class KubernetesScalingRealizer implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> { @Override public void realize( KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) { spec.getFlinkConfiguration().put( PipelineOptions.PARALLELISM_OVERRIDES.key(), ConfigurationUtils.convertValue(parallelismOverrides, String.class)); context.getResource().getStatus().setImmediateReconciliationNeeded(true); } }
4.4 KubernetesAutoScalerEventHandler
public class KubernetesAutoScalerEventHandler implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> { private final EventRecorder eventRecorder; public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) { this.eventRecorder = eventRecorder; } @Override public void handleEvent(KubernetesJobAutoScalerContext context, Type type, String reason, String message, @Nullable String messageKey) { eventRecorder.triggerEvent( context.getResource(), EventRecorder.Type.valueOf(type.name()), reason, EventRecorder.Component.Operator, message, messageKey); } }
4.5 KubernetesAutoScalerStateStore
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
/** The kubernetes auto scaler state store, it's based on the config map. */ public class KubernetesAutoScalerStateStore implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; private static final String COLLECTED_METRICS_KEY = "collectedMetrics"; private static final String SCALING_HISTORY_KEY = "scalingHistory"; private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides"; private final KubernetesClient kubernetesClient; private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>(); public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient; } // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY @Override public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) { getState(jobContext).put(SCALING_HISTORY_KEY, decision); } @Override public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState(jobContext).get(SCALING_HISTORY_KEY); } @Override public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState(jobContext).remove(SCALING_HISTORY_KEY); } private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) { return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData(); } @Override public void flush(KubernetesJobAutoScalerContext jobContext) { ConfigMap configMap = cache.get(jobContext.getJobKey()); Preconditions.checkState(configMap != null, "The configMap shouldn't be null."); try { cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update()); } catch (Exception e) { LOG.error( "Error while updating autoscaler info configmap, invalidating to clear the cache", e); removeInfoFromCache(jobContext); throw e; } } @Override public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) { cache.remove(jobContext.getJobKey()); } private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) { AbstractFlinkResource<?, ?> cr = jobContext.getResource(); var meta = createCmObjectMeta(ResourceID.fromResource(cr)); return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta)); } private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) { LOG.info("Creating scaling info config map"); var cm = new ConfigMap(); cm.setMetadata(meta); cm.addOwnerReference(cr); cm.setData(new HashMap<>()); return kubernetesClient.resource(cm).create(); } private ObjectMeta createCmObjectMeta(ResourceID uid) { var objectMeta = new ObjectMeta(); objectMeta.setName("autoscaler-" + uid.getName()); uid.getNamespace().ifPresent(objectMeta::setNamespace); objectMeta.setLabels( Map.of( Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_AUTOSCALER, Constants.LABEL_APP_KEY, uid.getName())); return objectMeta; } private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) { return Optional.ofNullable( kubernetesClient .configMaps() .inNamespace(objectMeta.getNamespace()) .withName(objectMeta.getName()) .get()); } }
5. Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
6. Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.
7. Rejected Alternatives
Rejected a couple of interface designs, the the whole solution is fine.