Page properties | |
---|---|
Discussion thread | Prepare the FLIP | Vote thread | JIRA | Release |
[This FLIP proposal is a joint work between Rui Fan and Samrat Deb ]
Motivation
FLIP-271: Autoscaling is a great proposal. Ideally, users only need to enable autoscaler for flink jobs without configuring parallelism, it is very helpful for the ease of use of flink. However flink-autoscaler is in the flink-kubernetes-operator repo and is coupled with kubernetes.
For some reasons, the flink jobs of flink on yarn also need to use flink-autoscaler:
- After Externalized Declarative Resource Management (FLIP-291, FLINK-31316) is merged, all types of Flink jobs can support in-place scaling. It means flink on yarn is easy to scaling now.
- Some flink users discussed in the mail list that flink-autoscaler should support flink on yarn.
This FLIP focus on decoupling autoscaler and kubernetes.
Core idea
- flink-autoscaler is a common autoscaler module
- [Must-to-have] It includes the general autoscaler strategy
- JobAutoScaler and the general JobAutoScalerImpl
- [Must-to-have] Define the Interface, such as: AutoScalerHandler and AutoScalerStateStore
- AutoScalerHandler: defining the event handler interface.
- AutoScalerStateStore: defining the state store interface for accessing and persisting state.
- [Must-to-have] It should not depend on any k8s related dependencies, including: fabric8, flink-kubernetes-operator and flink-kubernetes.
- [Must-to-have] It can depend on apache/flink project, because it must depend on some classes, such as: JobVertexID, Configuration, and MetricGroup, etc.
- [Nice-to-have] Move the flink-autoscaler to flink repo
- It's easy to implement flink-yarn-sutoscaler after decoupling. And adding flink-yarn-sutoscaler to flink-kubernetes-operator repo is wired.
- I prefer keeping this module in flink-kubernetes-operator repo during this FLIP, and we can move it in the last step of this FLIP.
- [Must-to-have] It includes the general autoscaler strategy
- flink-kubernetes-operator-autoscaler is a autoscaler module for flink on kubernetes
- [Must-to-have] Implement the AutoScalerHandler and AutoScalerStateStore
- AutoScalerHandler calls kubernetes do the real scaling or records some events.
- AutoScalerStateStore calls kubernetes to query and update the ConfigMap
- [Must-to-have] Implement the AutoScalerHandler and AutoScalerStateStore
Note: the independent `flink-kubernetes-operator-autoscaler` module isn't necessary, moving these classes to flink-kubernetes-operator
will reduce complexity. We can discuss it in the mail list.
Why it isn't necessary?
- In the POC version, `flink-kubernetes-operator-autoscaler` just defines 2 class, they are: KubernetesAutoScalerHandler and KubernetesAutoScalerStateStore.
- If `flink-kubernetes-operator-autoscaler` as an independent module, it must depend on `flink-kubernetes-operator` module.
- `flink-kubernetes-operator` cannot depend on `flink-kubernetes-operator-autoscaler, so it's more difficult to load these classes than remove the `flink-kubernetes-operator-autoscaler` module.
Public Interfaces
JobAutoScaler
Its interfaces are similar to the current JobAutoScaler, however the method parameters are changed. Changing these parameters from classes related to kubernetes to JobAutoScalerContext<KEY, INFO> and KEY jobKey.
It may be more reasonable to change the generic KEY to JOB_KEY, the auto sclaer will consider them as the same flink job when the jobKey is the same.
The generic INFO will be introduced later.
Code Block |
---|
/** The general Autoscaler instance. */
public interface JobAutoScaler<KEY, INFO> {
/** Called as part of the reconciliation loop. Returns true if this call led to scaling. */
boolean scale(JobAutoScalerContext<KEY, INFO> context);
/** Called when the custom resource is deleted. */
void cleanup(KEY jobKey);
} |
JobAutoScalerContext
...
|
Table of Contents
1. Motivation
The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly coupled with Kubernetes and resides within the flink-kubernetes-operator repository.
There are compelling reasons to extend the use of flink-autoscaler to more types of Flink jobs:
With the recent merge of the Externalized Declarative Resource Management (FLIP-291, FLINK-31316), in-place scaling is now supported across all types of Flink jobs. This development has made scaling Flink on YARN a straightforward process.
Several discussions within the Flink user community, as observed in the mail list , have emphasized the necessity of flink-autoscaler supporting Flink on YARN.
Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.
2. Core Idea
- JobAutoScaler and JobAutoScalerImpl: These components will define the generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.
- Interfaces: The FLIP outlines the necessity of defining a few interfaces -
ScalingRealizer, AutoScalerEventHandler
andAutoScalerStateStore
.The ScalingRealizer
interface handles scaling action.- The
AutoScalerEventHandler
interface handles loggable events. - The
AutoScalerStateStore
interface is responsible for accessing and persisting the autoscaler's state.
- Dependencies: The autoscaler module should be agnostic to any specific deployment framework like Kubernetes, YARN etc. Currently, core autoscaling framework closely coupled with Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes.
Ideally, it should rely on Apache Flink project dependencies to gather metrics and make scaling decisions based onJobVertexID
,Configuration
,MetricGroup
, and other relevant classes. - Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).
a. Since the autoscaler is not only compatible with the latest Flink version, moving it to the Flink repository may not be the most suitable option.
It would need to function across various Flink versions, potentially necessitating a separate repository/subproject.b. Removing the autoscaler from the flink-kubernetes-operator repository would introduce significant operational overhead and complicate the release process,
which would be viewed negatively from the operator's perspective.Therefore, considering above two points preference is to retain core autoscaler framework/logic within the flink-kubernetes-operator as a submodule in the short term. Should it prove to be stable in the future,
we can revisit the idea of relocating it. As it stabilizes, the release frequency will decrease, and this transition will have a minor impact on the operator.
Note:
Independent flink-kubernetes-operator-autoscaler
module is not necessary. Moving classes to flink-kubernetes-operator
will reduce complexity.
Why flink-kubernetes-operator-autoscaler
as seperate module isn't necessary?
If flink-kubernetes-operator-autoscaler
as an independent module, it must depend on flink-kubernetes-operator
module. flink-kubernetes-operator
cannot depend on
flink-kubernetes-operator-autoscaler
, so it's more difficult to load these classes than remove the flink-kubernetes-operator-autoscaler
module. flink-kubernetes-operator-autoscaler
just defines KubernetesScalingRealizer,
KubernetesAutoScalerEventHandler
and KubernetesAutoScalerStateStore
.
There are not many of these classes, so moving them to flink-kubernetes-operator
will reduce complexity.
3. Public Interfaces
3.1 JobAutoScaler
The proposed interface maintains a resemblance to the existing JobAutoScaler, albeit with adjustments to its method parameters. In lieu of employing Kubernetes-specific classes, the parameters will be substituted with JobAutoScalerContext<KEY>
, rendering it framework-agnostic, suitable for various deployment frameworks beyond Kubernetes.
The 'KEY' in this context corresponds to the 'jobKey', and when two instances share the same 'KEY,' they are regarded as representing the same Flink job.
Code Block |
---|
/**
* The generic Autoscaler.
*
* @param <KEY> The job key.
*/
@Internal
public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
/** Called as part of the reconciliation loop. */
void scale(Context context) throws Exception;
/** Called when the job is deleted. */
void cleanup(KEY key);
} |
3.2 JobAutoScalerContext
The JobAutoScalerContext
plays a pivotal role in consolidating crucial information necessary for scaling Flink jobs. It encompasses essential data such as the jobKey, jobId, configuration, MetricGroup, and more.
Currently as of now, the jobKey is defined using io.javaoperatorsdk.operator.processing.event.ResourceID
, as seen in the existing code and for Kubernetes jobs. However, it's important to note that there may be potential future developments where the jobKey for Flink jobs operating on YARN could differ.
The JobAutoScalerContext
, encompassing all pertinent details, will be furnished to these implementations. This comprehensive context will be provided when the autoscaler invokes their respective callbacks.
Code Block |
---|
/**
* The job autoscaler context, it includes all details related to the current job.
*
* @param <KEY> The job key.
*/
@Experimental
@AllArgsConstructor
@ToString
public class JobAutoScalerContext<KEY> {
/** The identifier of each flink job. */
@Getter private final KEY jobKey;
/** The jobId and jobStatus can be null when the job isn't started. */
@Nullable @Getter private final JobID jobID;
@Nullable @Getter private final JobStatus jobStatus;
@Getter private final Configuration configuration;
@Getter private final MetricGroup metricGroup;
@ToString.Exclude
private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;
public RestClusterClient<String> getRestClusterClient() throws Exception {
return restClientSupplier.get();
}
} |
3.3 ScalingRealizer
The ScalingRealizer interface is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize
method will be invoked from JobAutoscaler#scale
function
Code Block |
---|
/**
* The job autoscaler context.
*
* @param <KEY>
* @param <INFO>
*/
@AllArgsConstructor
public class JobAutoScalerContext<KEY, INFO> {
// The identifier of each flink job.
@Getter private final KEY jobKey;
@Getter private final JobID jobID;
@Getter private final long jobVersion;
// Whether the job is really running, the STARTING or CANCELING aren't running.
@Getter private final boolean isRunning;
@Getter private final Configuration conf;
@Getter private final MetricGroup metricGroup;
private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;
@Getter private final Duration flinkClientTimeout;
@Getter private final AutoScalerStateStore stateStore;
/**
* The flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements of AutoScalerHandler. This
* whole context will be passed to these implements when the autoscaler callbacks them.
*/
@Getter @Nullable private final INFO extraJobInfo;
public RestClusterClient<String> getRestClusterClient() throws Exception {
return restClientSupplier.get();
}
} |
AutoScalerHandler
AutoScalerHandler will be called by auto scaler when some cases need to be handle, such as: scaling error, report scaling result and update flink job based on the recommended parallelism.
For current code or kubernetes job, most of handlers will record event, all of handlers needs the `AbstractFlinkResource<?, ?>`, it's saved at `INFO extraJobInfo` of JobAutoScalerContext.
The `AutoScalerHandler` object is shared for all flink jobs, it doesn't have the job information. However, it needs the `AbstractFlinkResource<?, ?>` of every job, that's why adding the `INFO extraJobInfo` to the JobAutoScalerContext.
Code Block |
---|
/** * Handler all events during scaling The Scaling Realizer is responsible for managing scaling actions. * * @param <KEY> The job key. * @param <INFO><Context> Instance of JobAutoScalerContext. */ @Experimental public interface AutoScalerHandler<KEYScalingRealizer<KEY, INFO>Context extends JobAutoScalerContext<KEY>> { void handlerScalingError(JobAutoScalerContext<KEY, INFO> context, String errorMessage); void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessage); void handlerIneffectiveScaling(JobAutoScalerContext<KEY, INFO> context, String message); void handlerRecommendedParallelism( JobAutoScalerContext<KEY, INFO> context, HashMap<String, String> recommendedParallelism); } |
AutoScalerStateStore
...
/** Update job's parallelism to parallelismOverrides. */
void realize(Context context, Map<String, String> parallelismOverrides);
} |
3.4 AutoScalerEventHandler
The AutoScalerEventHandler interface is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent
method will be invoked by the auto scaler when there's a need to handle such events. These events might include scaling errors, reporting scaling results, and more.
It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent
method, allowing it to access the necessary job-related details when handling events.
Code Block |
---|
/**
* Handler all loggable events during scaling.
*
* @param <KEY> The job key.
* @param <Context> Instance of JobAutoScalerContext.
*/
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
/**
* Handle the event.
*
* @param interval When interval is great than 0, events that repeat within the interval will be
* ignored.
*/
void handleEvent(
Context context,
Type type,
String reason,
String message,
@Nullable String messageKey,
@Nullable Duration interval);
/** The type of the events. */
enum Type {
Normal,
Warning
}
} |
3.5 AutoScalerStateStore
The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.
In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap
. Consequently, the KubernetesAutoScalerStateStore
is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.
However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation InMemoryAutoScalerStateStore
. It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore
etc to ensure persistent storage of the state.
These method parameters of AutoScalerStateStore
to the specific class instead of String, such as: Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory
. So, all state store stores are responsible for the serialization, deserialization and state store.
Code Block |
---|
/**
* The state store is responsible for storing all state during scaling.
*
* @param <KEY> The job key.
* @param <Context> Instance of JobAutoScalerContext.
*/
@Experimental
public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {
void storeScalingHistory |
Code Block |
---|
public interface AutoScalerStateStore {
Optional<String> get(String key);
// Put the state to state store, please flush the state store to prevent the state lost.
void put(String key, String value);
void remove(String key);
void flush();
} |
Proposed Changes
Ensure the autoscaler module just keeping the general auto scaling strategy.
It includes JobAutoScalerImpl, ScalingMetricCollector, AutoScalerInfo and ScalingExecutor, etc.
We should remove the kubernetes related dependencies from these classes and use JobAutoScalerContext, AutoScalerHandler and AutoScalerStateStore instead.
Using the RestClusterClient<String>
instead of org.apache.flink.kubernetes.operator.service.FlinkService
The FlinkService is related to kubernetes, so we shouldn't use it. The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.
Implement the default and kubernetes classes for AutoScalerHandler
The default AutoScalerHandler could be the LoggedAutoScalerHandler. It just log the event when any method is called.
For current code or kubernetes job, most of handlers will record event, all of handlers needs the `AbstractFlinkResource<?, ?>`, it's saved at `INFO extraJobInfo` of JobAutoScalerContext.
The `AutoScalerHandler` object is shared for all flink jobs, it doesn't have the job information. However, it needs the `AbstractFlinkResource<?, ?>` of every job, that's why adding the `INFO extraJobInfo` to the JobAutoScalerContext.
Code Block |
---|
/** * The kubenetes auto scaler handler. * * @param <CR> */ public class KubernetesAutoScalerHandler<CR extends AbstractFlinkResource<?, ?>> implements AutoScalerHandler<ResourceID, CR> { private final KubernetesClient kubernetesClient; private final EventRecorder eventRecorder; public KubernetesAutoScalerHandler( KubernetesClient kubernetesClientContext jobContext, Map<JobVertexID, SortedMap<Instant, EventRecorderScalingSummary>> eventRecorderscalingHistory) { this.kubernetesClient = kubernetesClient throws Exception; Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>> getScalingHistory( this.eventRecorder = eventRecorder; Context jobContext) throws Exception; } @Override void removeScalingHistory(Context jobContext) throws Exception; public void handlerScalingErrorstoreEvaluatedMetrics( JobAutoScalerContext<ResourceIDContext jobContext, CR> contextSortedMap<Instant, StringCollectedMetrics> errorMessageevaluatedMetrics) { eventRecorder.triggerEvent( throws Exception; Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context jobContext) context.getExtraJobInfo(), throws Exception; void removeEvaluatedMetrics(Context jobContext) EventRecorder.Type.Warning,throws Exception; void storeParallelismOverrides(Context jobContext, Map<String, String> parallelismOverrides) EventRecorder.Reason.AutoscalerError, throws Exception; Optional<Map<String, String>> getParallelismOverrides(Context jobContext) EventRecorder.Component.Operator,throws Exception; void removeParallelismOverrides(Context jobContext) throws Exception; /** errorMessage); * Flushing is } needed because we just @Override save data in cache publicfor voidall handlerScalingReport( store methods. For less write * operations, we JobAutoScalerContext<ResourceID,flush CR>the context,cached Stringdata scalingReportMessage)to { the physical storage only after all operations have eventRecorder.triggerEvent( * been performed. */ void context.getExtraJobInfo(),flush(Context jobContext) throws Exception; /** Clean up all information related to the current job. */ EventRecorder.Type.Normal, void removeInfoFromCache(KEY jobKey); } |
4. Proposed Changes
4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.
It includes JobAutoScalerImpl
, ScalingMetricCollector
, AutoScalerInfo,
ScalingExecutor
etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext
, ScalingRealizer
, AutoScalerEventHandler
and AutoScalerStateStore
instead.
Using the RestClusterClient
instead of org.apache.flink.kubernetes.operator.service.FlinkService
- The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.
4.2 KubernetesJobAutoScalerContext
Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.
These logic should be generic for k8s, yarn and standalone.
Code Block |
---|
/** An implementation of JobAutoscalerContext for Kubernetes. */ public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> { private final AbstractFlinkResource<?, ?> resource; private final KubernetesClient kubernetesClient; public KubernetesJobAutoScalerContext( EventRecorder.Reason.ScalingReport, EventRecorder.Component.Operator, scalingReportMessage, "ScalingExecutor"); } @Override public void handlerIneffectiveScaling( JobAutoScalerContext<ResourceID, CR> context, String message) { eventRecorder.triggerEvent( context.getExtraJobInfo(), EventRecorder.Type.Normal, EventRecorder.Reason.IneffectiveScaling, @Nullable EventRecorder.Component.OperatorJobID jobID, @Nullable JobStatus jobStatus, message); } Configuration @Overrideconfiguration, public void handlerRecommendedParallelism( JobAutoScalerContext<ResourceID, CR> context, MetricGroup metricGroup, SupplierWithException<RestClusterClient<String>, Exception> HashMap<StringrestClientSupplier, String> recommendedParallelism) { AbstractFlinkResource<?, ?> resource, = context.getExtraJobInfo(); var flinkConf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());KubernetesClient kubernetesClient) { flinkConf.set(PARALLELISM_OVERRIDES, recommendedParallelism); super( resource.getSpec().setFlinkConfiguration(flinkConf.toMap()); KubernetesClientUtilsResourceID.applyToStoredCrfromResource(resource), kubernetesClientjobID, resourcejobStatus, stored ->configuration, metricGroup, stored.getSpec() restClientSupplier); this.resource = resource; this.kubernetesClient = kubernetesClient; .setFlinkConfiguration(resource.getSpec().getFlinkConfiguration()))} public AbstractFlinkResource<?, ?> getResource() { return resource; } public KubernetesClient getKubernetesClient() { return kubernetesClient; } } |
Alternative implementation: Create an AutoScalerHandler for the current job each time JobAutoScaler#scale is called, it means we change the AutoScalerHandler to a finer granularity. If so, we can:
- Adding the AutoScalerHandler into the JobAutoScalerContext
- And adding the JobAutoScalerContext into the AutoScalerHandler
- All hander methods don't need the JobAutoScalerContext, because it has includes the JobAutoScalerContext
Implement the default and kubernetes classes for AutoScalerStateStore
The default AutoScalerStateStore is the HeapedAutoScalerStateStore, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.
It's pluggable, so any database can be supportted. Flink users can implement other AutoScalerStateStore in their scenarios.
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
4.3 KubernetesScalingRealizer
Code Block |
---|
/** The Kubernetes implementation for applying parallelism overrides. */
public class KubernetesScalingRealizer
implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {
@Override
public void realize(
KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) {
context.getResource()
.getSpec()
.getFlinkConfiguration()
.put(
|
Code Block |
/** The kubernetes auto scaler state store. */ public class KubernetesAutoScalerStateStore implements AutoScalerStateStore { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; private final KubernetesClient kubernetesClient; private final ConfigMap configMap; public KubernetesAutoScalerStateStore( AbstractFlinkResource<?, ?> cr, KubernetesClient kubernetesClient) { PipelineOptions.PARALLELISM_OVERRIDES.key(), this.kubernetesClient = kubernetesClient; this.configMap = getConfigMap(cr, kubernetesClientConfigurationUtils.convertValue(parallelismOverrides, String.class)); } } |
4.4 KubernetesAutoScalerEventHandler
Code Block |
---|
/** An event handler which posts events to the Kubernetes events API. */ public class KubernetesAutoScalerEventHandler public static ConfigMap getConfigMap( AbstractFlinkResource<?, ?>implements crAutoScalerEventHandler<ResourceID, KubernetesClient kubeClient)KubernetesJobAutoScalerContext> { private final EventRecorder eventRecorder; var objectMeta = newpublic ObjectMetaKubernetesAutoScalerEventHandler(EventRecorder eventRecorder); { objectMeta.setName("autoscaler-" + cr.getMetadata().getName())this.eventRecorder = eventRecorder; } @Override objectMeta.setNamespace(cr.getMetadata().getNamespace()); public void handleEvent( return getScalingInfoConfigMapFromKube(objectMeta, kubeClient) KubernetesJobAutoScalerContext context, Type .orElseGet(type, String reason, () ->String {message, @Nullable String messageKey, @Nullable Duration LOG.info("Creating scaling info config map"); interval) { if (interval == null) { eventRecorder.triggerEvent( objectMetacontext.setLabelsgetResource(), EventRecorder.Type.valueOf(type.name()), Map.of( reason, message, Constants.LABEL_COMPONENT_KEYEventRecorder.Component.Operator, messageKey, context.getKubernetesClient()); LABEL_COMPONENT_AUTOSCALER, } else { eventRecorder.triggerEventByInterval( context.getResource(), Constants.LABEL_APP_KEY, EventRecorder.Type.valueOf(type.name()), reason, cr.getMetadata().getName())); EventRecorder.Component.Operator, var message, cm = new ConfigMap(); messageKey, cm.setMetadata(objectMeta); context.getKubernetesClient(), cm.addOwnerReference(crinterval); } cm.setData(new HashMap<>()); return kubeClient.resource(cm).create(); }); } private static Optional<ConfigMap> getScalingInfoConfigMapFromKube( ObjectMeta objectMeta, KubernetesClient kubeClient) { return Optional.ofNullable( kubeClient .configMaps() .inNamespace(objectMeta.getNamespace()) .withName(objectMeta.getName()) .get()); } @Override public Optional<String> get(String key) { return Optional.ofNullable(configMap.getData().get(key)); } @Override public void put(String key, String value) { configMap.getData().put(key, value); } @Override public void remove(String key) { configMap.getData().remove(key); } @Override public void flush() { kubernetesClient.resource(configMap).update(); } } |
Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
Whether
- [Nice-to-have] Move the flink-autoscaler to flink repo
- It's easy to implement flink-yarn-sutoscaler after decoupling. And adding flink-yarn-sutoscaler to flink-kubernetes-operator repo is wired.
- I prefer keeping this module in flink-kubernetes-operator repo during this FLIP, and we can move it in the last step of this FLIP.
Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.
...
}
} |
4.5 KubernetesAutoScalerStateStore
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
ConfigMapStore is responsible for put/get/remove seralizedStat.
KubernetesAutoScalerStateStore is responsible for serialize state from raw type to String, and deserialize state from String to raw type, it will access state from ConfigMapStore.
5. Standalone AutoScaler
We will implement the StandaloneAutoScaler at this FLIP, it's the generic autoscaler.
5.1 Separate process, JobListFetcher and control loop
The StandaloneAutoScaler can run as a separate process, the StandaloneAutoscalerEntrypoint is the main class of StandaloneAutoScaler.
The StandaloneAutoScaler is not responsible for job management, so there is no job list. In addition to the interfaces mentioned above, we introduced the JobListFetcher interface for StandaloneAutoScaler. The JobListFetcher will provide the job list and the jobContext of all jobs.
Code Block |
---|
/** The JobListFetcher will fetch all job list and the jobContext of all jobs. */
public interface JobListFetcher<KEY> {
List<JobAutoScalerContext<KEY>> fetch(MetricGroup metricGroup);
} |
Control loop
StandaloneAutoscalerEntrypoint call the JobListFetcher to fetch job list, and call the JobAutoscaler#scale for each job peroidically.
We can define the control-loop-interval option for StandaloneAutoscalerEntrypoint.
Note: The kubernetes-autoscaler doesn't need the StandaloneAutoScaler and JobListFetcher, it has the job management and control loop.
5.2 The implemetation of JobListFetcher
For flink standalone cluster, we can implement the StandaloneJobListFetcher. User provide the flink standalone cluster address, and StandaloneJobListFetcher will fetch job list via flink rest api.
For yarn cluster, we can implement the YarnJobListFetcher. User provide the yarn address, and YarnJobListFetcher will fetch job list via yarn rest api and flink rest api.
5.3 The implemetation of ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore
- We will implement the ScalingApiRealizer, it based on the rescale api of FLIP-291.
- The generic EventHandler based on the logger.
- The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.
- We will implement the JdbcAutoscalerStateStore as well.
6. Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
7. Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.
Ensure 100% functionality and test coverage of Kubernetes implementation.
8. Rejected Alternatives
Rejected a couple of interface designs, the the whole solution is fine.
yarn-autoscaler
For first version of yarn-autoscaler, we just support rescale job via the rescale api, and don't support re-deploy a new yarn application. Following the some reasons:
If we wanna support the entire yarn deployment, it means we need to implement the yarn job management in the yarn-autoscaler.
Not only rescale, but also start stop keepalive, etc.
- We need to maintains the job jar, job information, etc.
These parts are too heavy, so we just support the rescale api in the first version.
Note: Each company have a flink platform to manage their flink jobs that running on yarn. The flink platform has jobName, job address, job jar, etc. And the flink platform has the ability of start stop keepalive, so these platforms are easy to implement the ScalingRealizer in their cases after this FLIP.