Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposed interface maintains a resemblance to the existing JobAutoScaler, albeit with adjustments to its method parameters. In lieu of employing Kubernetes-specific classes, the parameters will be substituted with JobAutoScalerContext<KEY> , rendering it framework-agnostic, suitable for various deployment frameworks beyond Kubernetes.

The 'KEY' in this context corresponds to the 'jobKey', and when two instances share the same 'KEY,' they are regarded as representing the same Flink job.

Code Block
/**
 * The generic Autoscaler instance..
 *
 * @param <KEY> The job key.
 */
@Internal
public interface JobAutoScaler<KEY> JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {

    boolean/** Called as part of the reconciliation loop. */
    void scale(JobAutoScalerContext<KEY>Context context) throws Exception;

    /** Called when the job is deleted. */
    void cleanup(JobAutoScalerContext<KEY>KEY contextkey);

}

3.2 JobAutoScalerContext

...

Code Block
/**
 * The job autoscaler context.
, *it Encompassingincludes all pertinent details, Itrelated providesto comprehensivethe contextcurrent for autoscaler invokes their respective callbacks.job.
 *
 * @param <KEY> The job key.
 */
 @Experimental
@AllArgsConstructor
@ToString
public interfaceclass JobAutoScalerContext<KEY> {
 
    //** The identifier of each flink job. */
    @Getter private final KEY getJobKey()jobKey;
 
    JobID getJobID();
  
    Configuration getConfiguration();
 
    MetricGroup getMetricGroup();
 
    RestClusterClient<String> getRestClusterClient() throws Exception;

 }

3.3 ScalingRealizer

The ScalingRealizer interface is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize  method will be invoked from JobAutoscaler#scale  function.

Code Block
/**
* The Scaling Realizer. 
* Responsible for managing scaling actions.
* @param <KEY> the job key.
* @param Context Instance of JobAutoScalerContext.
*/
public interface ScalingRealizer<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void realize(Context context,
                Map<String, String> parallelismOverrides);
}

3.4 AutoScalerEventHandler

The AutoScalerEventHandler interface is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent  method will be invoked by the auto scaler when there's a need to handle such events. These events might include scaling errors, reporting scaling results, and more.

It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent  method, allowing it to access the necessary job-related details when handling events.

/** The jobId and jobStatus can be null when the job isn't started. */
    @Nullable @Getter private final JobID jobID;

    @Nullable @Getter private final JobStatus jobStatus;

    @Getter private final Configuration configuration;

    @Getter private final MetricGroup metricGroup;

    @ToString.Exclude
    private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

    public RestClusterClient<String> getRestClusterClient() throws Exception {
        return restClientSupplier.get();
    }
}

3.3 ScalingRealizer

The ScalingRealizer interface is responsible for managing scaling actions(upscale or downscale), and specifically, the ScalingRealizer#realize  method will be invoked from JobAutoscaler#scale  function.

Code Block
/**
 * The Scaling Realizer is responsible for managing scaling actions.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface ScalingRealizer<KEY, 
Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY>
 */
public interface AutoScalerEventHandler<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void handleEvent(Context context,
      /** Update job's parallelism to parallelismOverrides. */
    void realize(Context context, Map<String,        Type type,
                    String reason,
                    String message,
                    @Nullable String messageKey);

   enum Type {
       Normal,
       Warning
   }
}

3.5 AutoScalerStateStore

The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.

In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap . Consequently, the KubernetesAutoScalerStateStore  is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.

However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation HeapedAutoScalerStateStore . It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore  etc  to ensure persistent storage of the state.

String> parallelismOverrides);
}


3.4 AutoScalerEventHandler

The AutoScalerEventHandler interface is responsible for managing loggable events, and specifically, the AutoScalerEventHandler#handleEvent  method will be invoked by the auto scaler when there's a need to handle such events. These events might include scaling errors, reporting scaling results, and more.

It's important to note that the AutoScalerEventHandler object is shared across all Flink jobs and doesn't possess specific job information. That's precisely why the JobAutoScalerContext is passed as a parameter to the handleEvent  method, allowing it to access the necessary job-related details when handling events.


Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {

    /**
     * Handle the event.
     *
     * @param interval When interval is great than 0, events that repeat within the interval will be
     *     ignored.
     */
    void handleEvent(
Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore<KEY,
       Context    extends JobAutoScalerContext<KEY>>Context {context,

    void storeScalingHistory(Context jobContext, String decision);

   String getScalingHistory(Context jobContext);

Type type,
    void removeScalingHistory(Context jobContext);

   void storeEvaluatedMetrics(Context jobContext, String metrics);

reason,
      String getEvaluatedMetrics(Context jobContext);

   void removeEvaluatedMetrics(Context jobContext);

String message,
      void storeParallelismOverrides(Context jobContext, String parallelismOverrides);

  @Nullable String getParallelismOverrides(Context jobContext);

messageKey,
       void removeParallelismOverrides(Context jobContext);


   void@Nullable removeInfoFromCache(ContextDuration jobContextinterval);

    //** The flushtype isof neededthe because we just save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance 
   void flush(Context jobContext);
}

4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

...

events. */
    enum Type {
        Normal,
        Warning
    }
}


3.5 AutoScalerStateStore

The AutoScalerStateStore serves the crucial role of persisting and providing access to state information during the scaling process.

In the existing code and for Kubernetes jobs, this state is stored in a ConfigMap . Consequently, the KubernetesAutoScalerStateStore  is responsible for retrieving the ConfigMap before the scaling operation and preserving it after the scaling event.

However, for other types of jobs, such as those running on YARN or in standalone mode, the default behavior involves persisting scaling information in memory itself via introducing new implementation InMemoryAutoScalerStateStore . It's important to note that, in the future, there is the possibility to pesist in RDBMS or any persistent storage. It can be new implementation such as JdbcAutoScalerStateStore  etc  to ensure persistent storage of the state.


These method parameters of AutoScalerStateStore to the specific class instead of String, such as: Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory. So, all state store stores are responsible for the serialization, deserialization and state store.


Code Block
/**
 * The state store is responsible for storing all state during scaling.
 *
 * @param <KEY> The job key.
 * @param <Context> Instance of JobAutoScalerContext.
 */
@Experimental
public interface AutoScalerStateStore<KEY, Context extends JobAutoScalerContext<KEY>> {

    void storeScalingHistory(
   

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.

4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> {

   private final ResourceID resourceID;

   private final JobID jobID;

   private final Configuration configuration;

   private final MetricGroup metricGroup;

   private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

   private final AbstractFlinkResource<?, ?> resource;

   public KubernetesJobAutoScalerContext(
           ResourceID resourceID,
           JobID jobID,
           Configuration configuration,
         Context jobContext, MetricGroupMap<JobVertexID, metricGroupSortedMap<Instant, ScalingSummary>> scalingHistory)
           SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,throws Exception;

    Optional<Map<JobVertexID, SortedMap<Instant, ScalingSummary>>>  getScalingHistory(
   AbstractFlinkResource<?, ?> resource) {
      Context this.resourceID = resourceIDjobContext) throws Exception;

    void removeScalingHistory(Context jobContext) this.jobID = jobID;
   throws Exception;

    void storeEvaluatedMetrics(
    this.configuration  = configuration;
     Context  this.metricGroup = metricGroup;
jobContext, SortedMap<Instant, CollectedMetrics> evaluatedMetrics)
         this.restClientSupplier = restClientSupplier   throws Exception;

       this.resource = resource;Optional<SortedMap<Instant, CollectedMetrics>> getEvaluatedMetrics(Context jobContext)
   }

    @Override
   public ResourceID getJobKey() {throws Exception;

    void removeEvaluatedMetrics(Context jobContext) returnthrows resourceIDException;

   }

 void  @Override
   public JobID getJobID() {
storeParallelismOverrides(Context jobContext, Map<String, String> parallelismOverrides)
         return jobID;
  throws }Exception;

   @Override
   publicOptional<Map<String, ConfigurationString>> getConfigurationgetParallelismOverrides(Context jobContext) {throws Exception;

    void removeParallelismOverrides(Context jobContext) returnthrows configurationException;

    }/**

   @Override
  * publicFlushing MetricGroupis getMetricGroup() {
       return metricGroup;
   }

   @Override
   public RestClusterClient<String> getRestClusterClient() throws Exception {
       return restClientSupplier.get();
   }

   public AbstractFlinkResource<?, ?> getResource() {needed because we just save data in cache for all store methods. For less write
     * operations, we flush the cached data to the physical storage only after all operations have
     * been performed.
     */
  return resource;
 void  }
}

4.3 KubernetesScalingRealizer

Code Block
public class KubernetesScalingRealizerflush(Context jobContext) throws Exception;

    /** Clean up implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {
   @Overrideall information related to the current job. */
   public void realizeremoveInfoFromCache(
           KubernetesJobAutoScalerContext context,
           Map<String, String> parallelismOverrides) {
       spec.getFlinkConfiguration().put(
       PipelineOptions.PARALLELISM_OVERRIDES.key(),
       ConfigurationUtils.convertValue(parallelismOverrides, String.class));
       context.getResource().getStatus().setImmediateReconciliationNeeded(true);
   }
}

4.4 KubernetesAutoScalerEventHandler

KEY jobKey);
}


4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , ScalingRealizer , AutoScalerEventHandler  and AutoScalerStateStore  instead.

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
/** An implementation of JobAutoscalerContext for Kubernetes. */
public class KubernetesJobAutoScalerContext extends JobAutoScalerContext<ResourceID> {

    private final AbstractFlinkResource<?, ?> resource;

    private final KubernetesClient kubernetesClient;

    public KubernetesJobAutoScalerContext(
Code Block
public class KubernetesAutoScalerEventHandler
       implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

   private final EventRecorder eventRecorder;

   public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
       this.eventRecorder = eventRecorder;
   }

   @Override
   public void handleEvent(KubernetesJobAutoScalerContext context,
                           Type type,
                           String reason,
            @Nullable JobID jobID,
            @Nullable StringJobStatus messagejobStatus,
            Configuration configuration,
            MetricGroup metricGroup,
 @Nullable String messageKey) {
       eventRecorder.triggerEvent(
 SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
            context.getResource(),
        AbstractFlinkResource<?, ?> resource,
       EventRecorder.Type.valueOf(type.name()),
     KubernetesClient kubernetesClient) {
        reason,super(
                EventRecorder.Component.OperatorResourceID.fromResource(resource),
               message jobID,
               messageKey);
   }

}

4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

Code Block
/** The kubernetes auto scaler state store, it's based on the config map. */ 
public class KubernetesAutoScalerStateStore
       jobStatus,
                configuration,
                metricGroup,
             implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> {restClientSupplier);

    private static final Logger LOGthis.resource = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class)resource;

    private static final String LABEL_COMPONENT_AUTOSCALER this.kubernetesClient = "autoscaler"kubernetesClient;
   private }

  static final String COLLECTED_METRICS_KEY = "collectedMetrics";public AbstractFlinkResource<?, ?> getResource() {
   private static final String SCALING_HISTORY_KEY =return "scalingHistory"resource;
   private }

  static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";

public KubernetesClient getKubernetesClient() {
      private final KubernetesClientreturn kubernetesClient;

    }
}


4.3 KubernetesScalingRealizer

Code Block
/** The Kubernetes implementation for applying parallelism overrides. */
public class KubernetesScalingRealizer
  private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>();

   public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) {
      implements this.kubernetesClient = kubernetesClient;
ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {

    }
@Override
   // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY
   @Override
   public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision public void realize(
            KubernetesJobAutoScalerContext context, Map<String, String> parallelismOverrides) {
     getState(jobContext).put(SCALING_HISTORY_KEY, decision);   context.getResource()
   }

   @Override
   public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
    getState(jobContext).getgetSpec(SCALING_HISTORY_KEY);
    }

         @Override
   public void removeScalingHistory.getFlinkConfiguration(KubernetesJobAutoScalerContext jobContext)
 {
      getState(jobContext).remove(SCALING_HISTORY_KEY);
   }

   private Map<String, String> getState.put(KubernetesJobAutoScalerContext
 jobContext) {
       return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData();
   }

   @Override
   public void flush(KubernetesJobAutoScalerContext jobContext) {
 PipelineOptions.PARALLELISM_OVERRIDES.key(),
        ConfigMap configMap = cache.get(jobContext.getJobKey());
       Preconditions.checkState(configMap != null, "The configMap shouldn't be null." ConfigurationUtils.convertValue(parallelismOverrides, String.class));
    }
}


4.4 KubernetesAutoScalerEventHandler

Code Block
/** An event handler trywhich {
posts events to the Kubernetes events API. */
public    cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update());class KubernetesAutoScalerEventHandler
       } catchimplements (Exception e)AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

    private final EventRecorder eventRecorder;

    public LOG.errorKubernetesAutoScalerEventHandler(
EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
    }

   "Error while@Override
 updating autoscaler info configmap,public invalidatingvoid tohandleEvent(
 clear the cache",
         KubernetesJobAutoScalerContext context,
         e);
   Type type,
       removeInfoFromCache(jobContext);
     String reason,
     throw e;
      String }message,
   }

   @Override
   public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) {
@Nullable String messageKey,
     cache.remove(jobContext.getJobKey());
   }

   private ConfigMap@Nullable getOrCreateConfigMap(KubernetesJobAutoScalerContextDuration jobContextinterval) {
       AbstractFlinkResource<?, ?>if cr(interval == jobContext.getResource();null) {
       var   meta = createCmObjectMeta(ResourceIDeventRecorder.fromResourcetriggerEvent(cr));

             return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta));
   }
context.getResource(),
   private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) {
       LOG.info("Creating scaling info config map");
 EventRecorder.Type.valueOf(type.name()),
         var cm = new ConfigMap();
       cm.setMetadata(meta);reason,
       cm.addOwnerReference(cr);
       cm.setData(new HashMap<>());
     message,
  return kubernetesClient.resource(cm).create();
   }

   private ObjectMeta createCmObjectMeta(ResourceID uid) {
       var objectMeta = new ObjectMeta();
EventRecorder.Component.Operator,
           objectMeta.setName("autoscaler-" + uid.getName());
       uid.getNamespace().ifPresent(objectMeta::setNamespace);messageKey,
       objectMeta.setLabels(
               Mapcontext.ofgetKubernetesClient());
        } else {
             Constants.LABEL_COMPONENT_KEY,
  eventRecorder.triggerEventByInterval(
                     LABEL_COMPONENT_AUTOSCALERcontext.getResource(),
                       Constants.LABEL_APP_KEYEventRecorder.Type.valueOf(type.name()),
                       uid.getName()));
       return objectMeta;
reason,
     }

   private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) {
       return Optional.ofNullable(EventRecorder.Component.Operator,
               kubernetesClient
     message,
                  .configMaps()
  messageKey,
                     context.inNamespacegetKubernetesClient(objectMeta.getNamespace()),
                    interval);
   .withName(objectMeta.getName())
     }
                  .get());
   }
}}
}


4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

ConfigMapStore is responsible for put/get/remove seralizedStat.

KubernetesAutoScalerStateStore is responsible for serialize state from raw type to String, and deserialize state from String to raw type, it will access state from ConfigMapStore.

5. Standalone AutoScaler

We will implement the StandaloneAutoScaler at this FLIP, it's the generic autoscaler.

...