Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/** The kubernetes auto scaler state store, it's based on the config map. */ 
public class KubernetesAutoScalerStateStore
       implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> {

   private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

   private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";
   private static final String COLLECTED_METRICS_KEY = "collectedMetrics";
   private static final String SCALING_HISTORY_KEY = "scalingHistory";
   private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";

   private final KubernetesClient kubernetesClient;

   private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>();

   public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) {
       this.kubernetesClient = kubernetesClient;
   }

   // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY
   @Override
   public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) {
     getState(jobContext).put(SCALING_HISTORY_KEY, decision);
   }

   @Override
   public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
    getState(jobContext).get(SCALING_HISTORY_KEY);
   }

   @Override
   public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) {
      getState(jobContext).remove(SCALING_HISTORY_KEY);
   }

   private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) {
       return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData();
   }

   @Override
   public void flush(KubernetesJobAutoScalerContext jobContext) {
       ConfigMap configMap = cache.get(jobContext.getJobKey());
       Preconditions.checkState(configMap != null, "The configMap shouldn't be null.");
       try {
           cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update());
       } catch (Exception e) {
           LOG.error(
                   "Error while updating autoscaler info configmap, invalidating to clear the cache",
                   e);
           removeInfoFromCache(jobContext);
           throw e;
       }
   }

   @Override
   public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) {
       cache.remove(jobContext.getJobKey());
   }

   private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) {
       AbstractFlinkResource<?, ?> cr = jobContext.getResource();
       var meta = createCmObjectMeta(ResourceID.fromResource(cr));
       return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta));
   }

   private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) {
       LOG.info("Creating scaling info config map");
       var cm = new ConfigMap();
       cm.setMetadata(meta);
       cm.addOwnerReference(cr);
       cm.setData(new HashMap<>());
       return kubernetesClient.resource(cm).create();
   }

   private ObjectMeta createCmObjectMeta(ResourceID uid) {
       var objectMeta = new ObjectMeta();
       objectMeta.setName("autoscaler-" + uid.getName());
       uid.getNamespace().ifPresent(objectMeta::setNamespace);
       objectMeta.setLabels(
               Map.of(
                       Constants.LABEL_COMPONENT_KEY,
                       LABEL_COMPONENT_AUTOSCALER,
                       Constants.LABEL_APP_KEY,
                       uid.getName()));
       return objectMeta;
   }

   private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) {
       return Optional.ofNullable(
               kubernetesClient
                       .configMaps()
                       .inNamespace(objectMeta.getNamespace())
                       .withName(objectMeta.getName())
                       .get());
   }
}


5. Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.


6. Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

7. Rejected Alternatives

Rejected a couple of interface designs, the the whole solution is fine.