...
Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
** This FLIP proposal is a collaborative effort between Rui Fan and Samrat Deb , with valuable input and consulting provided by Gyula Fora and Maximilian Michels ** Table of Contents
1. Motivation
...
Code Block |
---|
/** It will be used store state during scaling. */ public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> { void storeScalingHistory(Context jobContext, String decision); String getScalingHistory(Context jobContext); void removeScalingHistory(Context jobContext); void storeEvaluatedMetrics(Context jobContext, String metrics); String getEvaluatedMetrics(Context jobContext); void removeEvaluatedMetrics(Context jobContext); void storeParallelismOverrides(Context jobContext, String parallelismOverrides); String getParallelismOverrides(Context jobContext); void removeParallelismOverrides(Context jobContext); void removeInfoFromCache(Context jobContext); // The flush is needed because we just save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance void flush(Context jobContext); } |
3.6 The generic autoscaler
As discussed with Gyula Fora and Samrat Deb , yarn autoscaler implementation will be out of scope for this FLIP. We will develop a generic autoscaler. This generic autoscaler will not have knowledge of specific jobs, and users will have the flexibility to pass the JobAutoScalerContext when utilizing the generic autoscaler. Communication with Flink jobs can be achieved through the RestClusterClient.
- The generic ScalingRealizer based on the rescale API (FLIP-291).
- The generic EventHandler based on the logger.
- The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.
4. Proposed Changes
4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.
...
4. Proposed Changes
4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.
It includes JobAutoScalerImpl
, ScalingMetricCollector
, AutoScalerInfo,
ScalingExecutor
etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext
, ScalingRealizer
, AutoScalerEventHandler
and AutoScalerStateStore
instead.
Using the RestClusterClient
instead of org.apache.flink.kubernetes.operator.service.FlinkService
- The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.
4.2 KubernetesJobAutoScalerContext
Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.
These logic should be generic for k8s, yarn and standalone.
Code Block |
---|
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> {
private final ResourceID resourceID;
private final JobID jobID;
private final Configuration configuration;
private final MetricGroup metricGroup;
private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;
private final AbstractFlinkResource<?, ?> resource;
public KubernetesJobAutoScalerContext(
ResourceID resourceID,
JobID jobID,
Configuration configuration,
MetricGroup metricGroup,
SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
|
Using the RestClusterClient
instead of org.apache.flink.kubernetes.operator.service.FlinkService
- The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.
4.2 KubernetesJobAutoScalerContext
Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.
These logic should be generic for k8s, yarn and standalone.
Code Block |
---|
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> { private final ResourceID resourceID; private final JobID jobID; private final Configuration configuration; private final MetricGroup metricGroup; private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier; private final AbstractFlinkResource<?, ?> resource; public KubernetesJobAutoScalerContext() { ResourceIDthis.resourceID = resourceID,; this.jobID = JobID jobID,; Configurationthis.configuration = configuration,; this.metricGroup = MetricGroup metricGroup,; this.restClientSupplier SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,= restClientSupplier; this.resource = resource; AbstractFlinkResource<?, ?> resource) {} @Override public ResourceID getJobKey() { this.resourceID = resourceID; return resourceID; this.jobID = jobID; } @Override public this.configuration = configuration;JobID getJobID() { this.metricGroup = metricGroupreturn jobID; } this.restClientSupplier = restClientSupplier;@Override public Configuration getConfiguration() { this.resource = resourcereturn configuration; } @Override public ResourceIDMetricGroup getJobKeygetMetricGroup() { return resourceIDmetricGroup; } @Override public JobIDRestClusterClient<String> getJobIDgetRestClusterClient() throws Exception { return jobIDrestClientSupplier.get(); } @Override public AbstractFlinkResource<?, public Configuration getConfiguration?> getResource() { return configurationresource; } @Override} |
4.3 KubernetesScalingRealizer
Code Block |
---|
public class KubernetesScalingRealizer public MetricGroup getMetricGroup() { implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> { return metricGroup;@Override } public void @Overriderealize( public RestClusterClient<String> getRestClusterClient() throws Exception { KubernetesJobAutoScalerContext context, return restClientSupplier.get(); } public AbstractFlinkResource<?Map<String, ?>String> getResource(parallelismOverrides) { return resourcespec.getFlinkConfiguration().put( PipelineOptions.PARALLELISM_OVERRIDES.key(), ConfigurationUtils.convertValue(parallelismOverrides, String.class)); context.getResource().getStatus().setImmediateReconciliationNeeded(true); } } |
4.
...
4 KubernetesAutoScalerEventHandler
Code Block |
---|
public class KubernetesScalingRealizerKubernetesAutoScalerEventHandler implements ScalingRealizer<ResourceIDAutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> { @Override private final EventRecorder eventRecorder; public void realize(KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) { this.eventRecorder = eventRecorder; } @Override public void handleEvent(KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) { spec.getFlinkConfiguration().put( PipelineOptions.PARALLELISM_OVERRIDES.key()Type type, ConfigurationUtils.convertValue(parallelismOverrides, String.class)); context.getResource().getStatus().setImmediateReconciliationNeeded(true); } } |
4.4 KubernetesAutoScalerEventHandler
Code Block |
---|
public class KubernetesAutoScalerEventHandler String reason, implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> { private final EventRecorder eventRecorder; public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) { this.eventRecorder = eventRecorder; String }message, @Override public void handleEvent(KubernetesJobAutoScalerContext context, @Nullable String messageKey) { Type type, eventRecorder.triggerEvent( context.getResource(), String reason EventRecorder.Type.valueOf(type.name()), reason, String message EventRecorder.Component.Operator, message, @Nullable String messageKey) {; } } |
4.5 KubernetesAutoScalerStateStore
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
Code Block |
---|
/** The kubernetes auto eventRecorder.triggerEvent( scaler state store, it's based on the config map. */ public class KubernetesAutoScalerStateStore context.getResource(), implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> { private static final Logger EventRecorder.Type.valueOf(type.name()),LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; private static final reason, String COLLECTED_METRICS_KEY = "collectedMetrics"; private static final String SCALING_HISTORY_KEY EventRecorder.Component.Operator,= "scalingHistory"; private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides"; private final KubernetesClient message,kubernetesClient; private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>(); public KubernetesAutoScalerStateStore(KubernetesClient messageKeykubernetesClient); { } } |
4.5 KubernetesAutoScalerStateStore
For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
Code Block |
---|
/** The kubernetes auto scaler state store, it's based on the config map. */ public class KubernetesAutoScalerStateStore implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; private static final String COLLECTED_METRICS_KEY = "collectedMetrics"; private static final String SCALING_HISTORY_KEY = "scalingHistory"; private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides"; private final KubernetesClient kubernetesClient; this.kubernetesClient = kubernetesClient; } // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY @Override public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) { getState(jobContext).put(SCALING_HISTORY_KEY, decision); } @Override public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState(jobContext).get(SCALING_HISTORY_KEY); } @Override public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState(jobContext).remove(SCALING_HISTORY_KEY); } private final ConcurrentHashMap<ResourceIDMap<String, ConfigMap>String> cache = new ConcurrentHashMap<>(); getState(KubernetesJobAutoScalerContext jobContext) { public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClientreturn cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData(); } // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY@Override public void flush(KubernetesJobAutoScalerContext jobContext) { @Override public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) { ConfigMap configMap = cache.get(jobContext.getJobKey()); getState(jobContext).put(SCALING_HISTORY_KEY, decision); } @OverridePreconditions.checkState(configMap != null, "The configMap shouldn't be null."); try { public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState cache.put(jobContext.getJobKey().get(SCALING_HISTORY_KEY, kubernetesClient.resource(configMap).update()); } @Override public} voidcatch removeScalingHistory(KubernetesJobAutoScalerContextException jobContexte) { getState(jobContext).remove(SCALING_HISTORY_KEY); LOG.error( } private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) { return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData(); } @Override public void flush(KubernetesJobAutoScalerContext jobContext) { "Error while updating autoscaler info configmap, invalidating to clear the cache", ConfigMap configMap = cache.get(jobContext.getJobKey()) e); Preconditions.checkState(configMap != null, "The configMap shouldn't be null."); removeInfoFromCache(jobContext); throw try {e; } cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update());} @Override public }void catch removeInfoFromCache(ExceptionKubernetesJobAutoScalerContext ejobContext) { LOG.error(cache.remove(jobContext.getJobKey()); } private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) { AbstractFlinkResource<?, ?> "Errorcr while updating autoscaler info configmap, invalidating to clear the cache", = jobContext.getResource(); var meta = createCmObjectMeta(ResourceID.fromResource(cr)); ereturn getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta)); } private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta removeInfoFromCache(jobContext);meta) { LOG.info("Creating scaling info throw econfig map"); } var cm } = new ConfigMap(); @Override public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) { cm.setMetadata(meta); cachecm.remove(jobContext.getJobKey())addOwnerReference(cr); } private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) {cm.setData(new HashMap<>()); AbstractFlinkResource<?, ?> cr = jobContext.getResourcereturn kubernetesClient.resource(cm).create(); } private varObjectMeta meta = createCmObjectMeta(ResourceID.fromResource(cr)); uid) { return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta))var objectMeta = new ObjectMeta(); } private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) { objectMeta.setName("autoscaler-" + uid.getName()); LOG.info("Creating scaling info config map"uid.getNamespace().ifPresent(objectMeta::setNamespace); var cm = new ConfigMap();objectMeta.setLabels( cm.setMetadata(meta); cmMap.addOwnerReferenceof(cr); cm.setData(new HashMap<>()); return kubernetesClient.resource(cm).create(); } private ObjectMeta createCmObjectMeta(ResourceID uid) { Constants.LABEL_COMPONENT_KEY, var objectMeta = new ObjectMeta(); objectMeta.setName("autoscaler-" + uid.getName()); LABEL_COMPONENT_AUTOSCALER, uid.getNamespace().ifPresent(objectMeta::setNamespace); objectMeta.setLabels( Constants.LABEL_APP_KEY, Map.of( uid.getName())); Constants.LABEL_COMPONENT_KEY,return objectMeta; } private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) { return Optional.ofNullable( LABEL_COMPONENT_AUTOSCALER,kubernetesClient Constants.LABEL_APP_KEY,.configMaps() uid.getNameinNamespace(objectMeta.getNamespace())); return objectMeta; } private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) { return Optional.ofNullable(.withName(objectMeta.getName()) kubernetesClient .configMaps() .inNamespace(objectMeta.getNamespace()) .withName(objectMeta.getName()) .get()); } } |
5. Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
6. Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.
7. Rejected Alternatives
.get());
}
} |
5. Standalone AutoScaler
We will implement the StandaloneAutoScaler at this FLIP, it's the generic autoscaler.
5.1 Separate process, JobListFetcher and control loop
The StandaloneAutoScaler can run as a separate process, the StandaloneAutoscalerEntrypoint is the main class of StandaloneAutoScaler.
The StandaloneAutoScaler is not responsible for job management, so there is no job list. In addition to the interfaces mentioned above, we introduced the JobListFetcher interface for StandaloneAutoScaler. The JobListFetcher will provide the job list and the jobContext of all jobs.
Code Block |
---|
/** The JobListFetcher will fetch all job list and the jobContext of all jobs. */
public interface JobListFetcher<KEY> {
List<JobAutoScalerContext<KEY>> fetch(MetricGroup metricGroup);
} |
Control loop
StandaloneAutoscalerEntrypoint call the JobListFetcher to fetch job list, and call the JobAutoscaler#scale for each job peroidically.
We can define the control-loop-interval option for StandaloneAutoscalerEntrypoint.
Note: The kubernetes-autoscaler doesn't need the StandaloneAutoScaler and JobListFetcher, it has the job management and control loop.
5.2 The implemetation of JobListFetcher
For flink standalone cluster, we can implement the StandaloneJobListFetcher. User provide the flink standalone cluster address, and StandaloneJobListFetcher will fetch job list via flink rest api.
For yarn cluster, we can implement the YarnJobListFetcher. User provide the yarn address, and YarnJobListFetcher will fetch job list via yarn rest api and flink rest api.
5.3 The implemetation of ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore
- We will implement the ScalingApiRealizer, it based on the rescale api of FLIP-291.
- The generic EventHandler based on the logger.
- The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.
- We will implement the JdbcAutoscalerStateStore as well.
6. Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
7. Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.
Ensure 100% functionality and test coverage of Kubernetes implementation.
8. Rejected Alternatives
Rejected a couple of interface designs, the the whole solution is fine.
yarn-autoscaler
For first version of yarn-autoscaler, we just support rescale job via the rescale api, and don't support re-deploy a new yarn application. Following the some reasons:
If we wanna support the entire yarn deployment, it means we need to implement the yarn job management in the yarn-autoscaler.
Not only rescale, but also start stop keepalive, etc.
- We need to maintains the job jar, job information, etc.
These parts are too heavy, so we just support the rescale api in the first version.
Note: Each company have a flink platform to manage their flink jobs that running on yarn. The flink platform has jobName, job address, job jar, etc. And the flink platform has the ability of start stop keepalive, so these platforms are easy to implement the ScalingRealizer in their cases after this FLIPRejected a couple of interface designs, the the whole solution is fine.