Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/kmm03gls1vw4x6vk1ypr9ny9q9522495
Vote thread
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32723

Release



** This FLIP proposal is a collaborative effort between  Rui Fan and Samrat Deb  , with valuable input and consulting provided by Gyula Fora and Maximilian Michels **

Table of Contents

1. Motivation

...

Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void storeScalingHistory(Context jobContext, String decision);

   String getScalingHistory(Context jobContext);

   void removeScalingHistory(Context jobContext);

   void storeEvaluatedMetrics(Context jobContext, String metrics);

   String getEvaluatedMetrics(Context jobContext);

   void removeEvaluatedMetrics(Context jobContext);

   void storeParallelismOverrides(Context jobContext, String parallelismOverrides);

   String getParallelismOverrides(Context jobContext);

   void removeParallelismOverrides(Context jobContext);


   void removeInfoFromCache(Context jobContext);

   // The flush is needed because we just save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance 
   void flush(Context jobContext);
}

3.6 The generic autoscaler

As discussed with Gyula Fora  and Samrat Deb ,  yarn autoscaler implementation will be out of scope for this FLIP. We will develop a generic autoscaler. This generic autoscaler will not have knowledge of specific jobs, and users will have the flexibility to pass the JobAutoScalerContext when utilizing the generic autoscaler. Communication with Flink jobs can be achieved through the RestClusterClient. 

  • The generic ScalingRealizer based on the rescale API (FLIP-291).
  • The generic EventHandler based on the logger.
  • The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.

4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

...


4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , ScalingRealizer , AutoScalerEventHandler  and AutoScalerStateStore  instead.

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> {

   private final ResourceID resourceID;

   private final JobID jobID;

   private final Configuration configuration;

   private final MetricGroup metricGroup;

   private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

   private final AbstractFlinkResource<?, ?> resource;

   public KubernetesJobAutoScalerContext(
           ResourceID resourceID,
           JobID jobID,
           Configuration configuration,
           MetricGroup metricGroup,
           SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
          

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.

4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> {

   private final ResourceID resourceID;

   private final JobID jobID;

   private final Configuration configuration;

   private final MetricGroup metricGroup;

   private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

   private final AbstractFlinkResource<?, ?> resource;

   public KubernetesJobAutoScalerContext() {
           ResourceIDthis.resourceID = resourceID,;
       this.jobID   = JobID jobID,;
           Configurationthis.configuration = configuration,;
       this.metricGroup   = MetricGroup metricGroup,;
       this.restClientSupplier    SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,= restClientSupplier;
       this.resource = resource;
   AbstractFlinkResource<?, ?> resource) {}

   @Override
   public ResourceID getJobKey() {
 this.resourceID = resourceID;
    return resourceID;
  this.jobID = jobID; }

   @Override
   public this.configuration = configuration;JobID getJobID() {
       this.metricGroup = metricGroupreturn jobID;
   }

    this.restClientSupplier = restClientSupplier;@Override
   public Configuration getConfiguration() {
       this.resource = resourcereturn configuration;
   }

   @Override
   public ResourceIDMetricGroup getJobKeygetMetricGroup() {
       return resourceIDmetricGroup;
   }

   @Override
   public JobIDRestClusterClient<String> getJobIDgetRestClusterClient() throws Exception {
       return jobIDrestClientSupplier.get();
   }

   @Override
public AbstractFlinkResource<?,  public Configuration getConfiguration?> getResource() {
       return configurationresource;
   }

   @Override}


4.3 KubernetesScalingRealizer

Code Block
public class KubernetesScalingRealizer
   public MetricGroup getMetricGroup() {
 implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {
   return metricGroup;@Override
   }

public void  @Overriderealize(
   public RestClusterClient<String> getRestClusterClient() throws Exception {
   KubernetesJobAutoScalerContext context,
   return restClientSupplier.get();
   }

   public AbstractFlinkResource<?Map<String, ?>String> getResource(parallelismOverrides) {
       return resourcespec.getFlinkConfiguration().put(
       PipelineOptions.PARALLELISM_OVERRIDES.key(),
       ConfigurationUtils.convertValue(parallelismOverrides, String.class));
       context.getResource().getStatus().setImmediateReconciliationNeeded(true);
   }
}


4.

...

4 KubernetesAutoScalerEventHandler

Code Block
public class KubernetesScalingRealizerKubernetesAutoScalerEventHandler
       implements ScalingRealizer<ResourceIDAutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

   @Override
private final EventRecorder eventRecorder;

   public void realize(KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
       this.eventRecorder = eventRecorder;
   }

   @Override
   public void handleEvent(KubernetesJobAutoScalerContext context,
           Map<String, String> parallelismOverrides) {
       spec.getFlinkConfiguration().put(
       PipelineOptions.PARALLELISM_OVERRIDES.key()Type type,
         ConfigurationUtils.convertValue(parallelismOverrides, String.class));
       context.getResource().getStatus().setImmediateReconciliationNeeded(true);
   }
}

4.4 KubernetesAutoScalerEventHandler

Code Block
public class KubernetesAutoScalerEventHandler
     String reason,
 implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

   private final EventRecorder eventRecorder;

   public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
       this.eventRecorder = eventRecorder;
  String }message,

   @Override
   public void handleEvent(KubernetesJobAutoScalerContext context,
                  @Nullable String messageKey) {
      Type type, eventRecorder.triggerEvent(
               context.getResource(),
            String reason   EventRecorder.Type.valueOf(type.name()),
               reason,
             String message  EventRecorder.Component.Operator,
               message,
            @Nullable  String messageKey) {;
   }

}


4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.


Code Block
/** The kubernetes auto eventRecorder.triggerEvent(
scaler state store, it's based on the config map. */  
public class KubernetesAutoScalerStateStore
  context.getResource(),
     implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> {

   private static final Logger EventRecorder.Type.valueOf(type.name()),LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

   private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
   private static final reason,
     String COLLECTED_METRICS_KEY = "collectedMetrics";
   private static final String SCALING_HISTORY_KEY   EventRecorder.Component.Operator,= "scalingHistory";
   private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";

   private final KubernetesClient message,kubernetesClient;

   private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>();

   public KubernetesAutoScalerStateStore(KubernetesClient messageKeykubernetesClient); {
   }

}

4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

Code Block
/** The kubernetes auto scaler state store, it's based on the config map. */ 
public class KubernetesAutoScalerStateStore
       implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> {

   private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

   private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
   private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
   private static final String SCALING_HISTORY_KEY = "scalingHistory";
   private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";

   private final KubernetesClient kubernetesClient;    this.kubernetesClient = kubernetesClient;
   }

   // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY
   @Override
   public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) {
     getState(jobContext).put(SCALING_HISTORY_KEY, decision);
   }

   @Override
   public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
    getState(jobContext).get(SCALING_HISTORY_KEY);
   }

   @Override
   public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) {
      getState(jobContext).remove(SCALING_HISTORY_KEY);
   }

   private final ConcurrentHashMap<ResourceIDMap<String, ConfigMap>String> cache = new ConcurrentHashMap<>();

getState(KubernetesJobAutoScalerContext jobContext) {
    public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) {
       this.kubernetesClient = kubernetesClientreturn cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData();
   }

   // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY@Override
   public void flush(KubernetesJobAutoScalerContext jobContext) {
   @Override
   public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) {
ConfigMap configMap = cache.get(jobContext.getJobKey());
        getState(jobContext).put(SCALING_HISTORY_KEY, decision);
   }

   @OverridePreconditions.checkState(configMap != null, "The configMap shouldn't be null.");
       try {
   public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
    getState cache.put(jobContext.getJobKey().get(SCALING_HISTORY_KEY, kubernetesClient.resource(configMap).update());
   }

   @Override
   public} voidcatch removeScalingHistory(KubernetesJobAutoScalerContextException jobContexte) {
      getState(jobContext).remove(SCALING_HISTORY_KEY);     LOG.error(
   }

   private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) {
       return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData();
   }

   @Override
   public void flush(KubernetesJobAutoScalerContext jobContext) {
 "Error while updating autoscaler info configmap, invalidating to clear the cache",
                ConfigMap configMap = cache.get(jobContext.getJobKey()) e);
       Preconditions.checkState(configMap != null, "The configMap shouldn't be null.");
    removeInfoFromCache(jobContext);
           throw try {e;
       }
    cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update());}

   @Override
   public }void catch removeInfoFromCache(ExceptionKubernetesJobAutoScalerContext ejobContext) {
           LOG.error(cache.remove(jobContext.getJobKey());
   }

   private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) {
       AbstractFlinkResource<?, ?> "Errorcr while updating autoscaler info configmap, invalidating to clear the cache",
  = jobContext.getResource();
       var meta = createCmObjectMeta(ResourceID.fromResource(cr));
                 ereturn getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta));
   }

   private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta removeInfoFromCache(jobContext);meta) {
       LOG.info("Creating scaling info  throw econfig map");
       }
var cm  }

= new ConfigMap();
   @Override
   public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) { cm.setMetadata(meta);
       cachecm.remove(jobContext.getJobKey())addOwnerReference(cr);
   }

   private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) {cm.setData(new HashMap<>());
       AbstractFlinkResource<?, ?> cr = jobContext.getResourcereturn kubernetesClient.resource(cm).create();
   }

   private varObjectMeta meta = createCmObjectMeta(ResourceID.fromResource(cr)); uid) {
       return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta))var objectMeta = new ObjectMeta();
   }

   private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) { objectMeta.setName("autoscaler-" + uid.getName());
       LOG.info("Creating scaling info config map"uid.getNamespace().ifPresent(objectMeta::setNamespace);
       var cm = new ConfigMap();objectMeta.setLabels(
       cm.setMetadata(meta);
        cmMap.addOwnerReferenceof(cr);
       cm.setData(new HashMap<>());
       return kubernetesClient.resource(cm).create();
   }

   private ObjectMeta createCmObjectMeta(ResourceID uid) {
 Constants.LABEL_COMPONENT_KEY,
          var objectMeta = new ObjectMeta();
       objectMeta.setName("autoscaler-" + uid.getName());
 LABEL_COMPONENT_AUTOSCALER,
        uid.getNamespace().ifPresent(objectMeta::setNamespace);
       objectMeta.setLabels(
        Constants.LABEL_APP_KEY,
       Map.of(
                uid.getName()));
       Constants.LABEL_COMPONENT_KEY,return objectMeta;
   }

   private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) {
       return Optional.ofNullable(
               LABEL_COMPONENT_AUTOSCALER,kubernetesClient
                       Constants.LABEL_APP_KEY,.configMaps()
                       uid.getNameinNamespace(objectMeta.getNamespace()));
       return objectMeta;
   }

   private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) {
       return Optional.ofNullable(.withName(objectMeta.getName())
               kubernetesClient
                       .configMaps()
                       .inNamespace(objectMeta.getNamespace())
                       .withName(objectMeta.getName())
                       .get());
   }
}

5. Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.

6. Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

7. Rejected Alternatives

.get());
   }
}



5. Standalone AutoScaler

We will implement the StandaloneAutoScaler at this FLIP, it's the generic autoscaler.

5.1 Separate process, JobListFetcher and control loop

The StandaloneAutoScaler can run as a separate process, the StandaloneAutoscalerEntrypoint is the main class of StandaloneAutoScaler.

The StandaloneAutoScaler is not responsible for job management, so there is no job list. In addition to the interfaces mentioned above, we introduced the JobListFetcher interface for StandaloneAutoScaler. The JobListFetcher will provide the job list and the jobContext of all jobs.

Code Block
/** The JobListFetcher will fetch all job list and the jobContext of all jobs. */
public interface JobListFetcher<KEY> {

    List<JobAutoScalerContext<KEY>> fetch(MetricGroup metricGroup);

}

Control loop

StandaloneAutoscalerEntrypoint call the JobListFetcher to fetch job list, and call the JobAutoscaler#scale  for each job peroidically.

We can define the control-loop-interval  option for StandaloneAutoscalerEntrypoint.


Note: The kubernetes-autoscaler doesn't need the StandaloneAutoScaler and JobListFetcher, it has the job management and control loop.

5.2 The implemetation of JobListFetcher

For flink standalone cluster, we can implement the StandaloneJobListFetcher. User provide the flink standalone cluster address, and StandaloneJobListFetcher will fetch job list via flink rest api.

For yarn cluster, we can implement the YarnJobListFetcher. User provide the yarn address, and YarnJobListFetcher will fetch job list via yarn rest api and flink rest api.

5.3 The implemetation of ScalingRealizer, AutoScalerEventHandler and AutoScalerStateStore

  • We will implement the ScalingApiRealizer, it based on the rescale api of FLIP-291.
  • The generic EventHandler based on the logger.
  • The generic StateStore based on the Heap. This means that the state information is stored in memory and can be lost if the autoscaler restarts.
    • We will implement the JdbcAutoscalerStateStore as well.

6. Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.


7. Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

Ensure 100% functionality and test coverage of Kubernetes implementation.

8. Rejected Alternatives

Rejected a couple of interface designs, the the whole solution is fine.

yarn-autoscaler

For first version of yarn-autoscaler, we just support rescale job via the rescale api, and don't support re-deploy a new yarn application. Following the some reasons:

  • If we wanna support the entire yarn deployment, it means we need to implement the yarn job management in the yarn-autoscaler. 

  • Not only rescale, but also start stop keepalive, etc.

  • We need to maintains the job jar, job information, etc.

These parts are too heavy, so we just support the rescale api in the first version.

Note: Each company have a flink platform to manage their flink jobs that running on yarn. The flink platform has jobName, job address, job jar, etc. And the flink platform has the ability of start stop keepalive, so these platforms are easy to implement the ScalingRealizer in their cases after this FLIPRejected a couple of interface designs, the the whole solution is fine.