Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
[** This FLIP proposal is a joint work collaborative effort between Rui Fan and Samrat Deb , with valuable input and did some consulting with provided by Gyula Fora and Maximilian Michels ] **
Table of Contents
1. Motivation
The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated coupled with Kubernetes and resides within the flink-kubernetes-operator repository.
...
- JobAutoScaler and JobAutoScalerImpl: These components will define the generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.
- Interfaces: The FLIP outlines the necessity of defining a few interfaces -
ScalingRealizer, AutoScalerEventHandler
andAutoScalerStateStore
.The ScalingRealizer
interface handles scaling action.- The
AutoScalerEventHandler
interface handles loggable events. - The
AutoScalerStateStore
interface is responsible for accessing and persisting the autoscaler's state.
- Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as fabric8, flinkbe agnostic to any specific deployment framework like Kubernetes, YARN etc. Currently, core autoscaling framework closely coupled with Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes.
InsteadIdeally, it can should rely on Apache Flink project dependencies to gather metrics and make scaling decisions based onJobVertexID
,Configuration
,MetricGroup
, and other relevant classes. - Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).
a. Since Due to the autoscaler not 's compatibility with only supports the latest flink Flink version, so moving it to the Flink repository may not be a good choice moving it to flink repo. It needs to work across Flink versions this probably have to be a separate repothe most suitable option.
It would need to function across various Flink versions, potentially necessitating a separate repository/subproject.b. Remove Removing the autoscaler from operator repo will also bring a lot of operational overhead / release process etc which will be a negative from the operator’s perspective.the flink-kubernetes-operator repository would introduce significant operational overhead and complicate the release process,
which would be viewed negatively from the operator's perspective.Therefore, considering above two points preference is to retain core autoscaler framework/logic within the flink-kubernetes-operator as a submodule c. So, we prefer stay it and flink-yarn-autoscaler as a submodule of the flink-kubernetes-operator in the short term. If it's Should it prove to be stable in the future,
we can discuss move them. After it's stablerevisit the idea of relocating it. As it stabilizes, the release frequency will be reduceddecrease, and it will be small negative for this transition will have a minor impact on the operator.
Note:
Independent flink-kubernetes-operator-autoscaler
module is not necessary. Moving classes to flink-kubernetes-operator
will reduce complexity.
Why it isn flink-kubernetes-operator-autoscaler
as seperate module isn't necessary?
...
If flink-kubernetes-operator-autoscaler
as an independent module, it must depend on flink-kubernetes-operator
module.
...
flink-kubernetes-operator
cannot depend on
flink-kubernetes-operator-autoscaler
, so it's more difficult to load these classes than remove the flink-kubernetes-operator-autoscaler
module.
...
flink-kubernetes-operator-autoscaler
...
just defines KubernetesScalingRealizer,
KubernetesAutoScalerEventHandler
and KubernetesAutoScalerStateStore
.
...
There are not many of these classes, so moving them to flink-kubernetes-operator
will reduce complexity.
3. Public Interfaces
3.1 JobAutoScaler
The proposed interface retains similarity maintains a resemblance to the existing JobAutoScaler, although albeit with modified adjustments to its method parameters. Instead In lieu of using employing Kubernetes-related specific classes, the parameters will be replaced substituted with JobAutoScalerContext<KEY>
. The KEY is jobKey, if the KEY is the same, it is considered to be the same flink , rendering it framework-agnostic, suitable for various deployment frameworks beyond Kubernetes.
The 'KEY' in this context corresponds to the 'jobKey', and when two instances share the same 'KEY,' they are regarded as representing the same Flink job.
Code Block |
---|
/** The generic Autoscaler instance. */ public interface JobAutoScaler<KEY> { boolean scale(JobAutoScalerContext<KEY> context); /** Called when the job is deleted. */ void cleanup(JobAutoScalerContext<KEY> context); } |
3.2 JobAutoScalerContext
The JobAutoScalerContext
encapsulates essential information required plays a pivotal role in consolidating crucial information necessary for scaling Flink jobs, including . It encompasses essential data such as the jobKey, jobId, configuration and , MetricGroup, etcand more.
Currently , in the existing code or for Kubernetes jobsas of now, the jobKey is defined as using io.javaoperatorsdk.operator.processing.event.ResourceID
.
However, there is a possibility to define , as seen in the existing code and for Kubernetes jobs. However, it's important to note that there may be potential future developments where the jobKey for Flink jobs running operating on YARN in the futurecould differ.
The entire JobAutoScalerContext
, comprising encompassing all relevant pertinent details, will be passed furnished to these implementations. This comprehensive context will be provided when the autoscaler invokes their respective callbacks.
...
Code Block |
---|
/**
* The job autoscaler context.
* Encompassing all pertinent details, It provides comprehensive context for autoscaler invokes their respective callbacks.
* @param <KEY> The job key.
*/
public interface JobAutoScalerContext<KEY> {
// The identifier of each flink job.
KEY getJobKey();
JobID getJobID();
Configuration getConfiguration();
MetricGroup getMetricGroup();
RestClusterClient<String> getRestClusterClient() throws Exception;
} |
...
The ScalingRealizer interface handles scaling action, and is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize
method will be called inside of the invoked from JobAutoscaler#scale
function.
Code Block |
---|
/**
* The Scaling Realizer.
* Responsible for managing scaling actions.
* @param <KEY> the job key.
* @param Context Instance of JobAutoScalerContext.
*/
public interface ScalingRealizer<KEY,
Context extends JobAutoScalerContext<KEY>> {
void realize(Context context,
Map<String, String> parallelismOverrides);
} |
...
3.4 AutoScalerEventHandler
The AutoScalerEventHandler interface handles is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent
method will be called invoked by the auto scaler when some loggable events there's a need to be handled, such as: scaling error, report scaling result, etc.
The AutoScalerEventHandler
object is shared for all flink jobs, it doesn't have the job information, that's why the JobAutoScalerContext as the parameter of handleEventhandle such events. These events might include scaling errors, reporting scaling results, and more.
It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent
method, allowing it to access the necessary job-related details when handling events.
Code Block |
---|
/** * Handler all loggable events during scaling. * * @param <KEY> */ public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> { void handleEvent(Context context, Type type, String reason, String message, @Nullable String messageKey); enum Type { Normal, Warning } } |
3.5 AutoScalerStateStore
The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.
In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap
. Consequently, the KubernetesAutoScalerStateStore
is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.
However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation HeapedAutoScalerStateStore
. This means that the state information is stored in memory and can be lost if the autoscaler restarts. It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore
etc to ensure persistent storage of the stateAutoScalerStateStore
is responsible for persist and access state during scaling.
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore
needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a HeapedAutoScalerStateStore
, it means the state will be lost after autoscaler restart. Of course, we can implement JdbcAutoScalerStateStore
to persist the store in the future.
Code Block |
---|
/** It will be used store state during scaling. */ public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> { void storeScalingHistory(Context jobContext, String decision); String getScalingHistory(Context jobContext); void removeScalingHistory(Context jobContext); void storeEvaluatedMetrics(Context jobContext, String metrics); String getEvaluatedMetrics(Context jobContext); void removeEvaluatedMetrics(Context jobContext); void storeParallelismOverrides(Context jobContext, String parallelismOverrides); String getParallelismOverrides(Context jobContext); void removeParallelismOverrides(Context jobContext); void removeInfoFromCache(Context jobContext); // The flush is needed because we just save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance void flush(Context jobContext); } |
...
As discussed with Gyula Fora and Samrat Deb , we don't implement the yarn autoscaler at implementation will be out of scope for this FLIP, we can just implement . we will develop a generic autoscaler based on the rescale apiAPI (FLIP-291). The This generic autoscaler doesn't know any jobwill not have knowledge of specific jobs, and users can will have the flexibility to pass the JobAutoScalerContext to using when utilizing the generic autoscaler. It can communicate with flink job through Communication with Flink jobs can be achieved through the RestClusterClient.
4. Proposed Changes
4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.
...