THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
/** The kubernetes auto scaler state store, it's based on the config map. */ public class KubernetesAutoScalerStateStore implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; private static final String COLLECTED_METRICS_KEY = "collectedMetrics"; private static final String SCALING_HISTORY_KEY = "scalingHistory"; private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides"; private final KubernetesClient kubernetesClient; private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>(); public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient; } // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY @Override public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) { getState(jobContext).put(SCALING_HISTORY_KEY, decision); } @Override public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState(jobContext).get(SCALING_HISTORY_KEY); } @Override public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) { getState(jobContext).remove(SCALING_HISTORY_KEY); } private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) { return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData(); } @Override public void flush(KubernetesJobAutoScalerContext jobContext) { ConfigMap configMap = cache.get(jobContext.getJobKey()); Preconditions.checkState(configMap != null, "The configMap shouldn't be null."); try { cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update()); } catch (Exception e) { LOG.error( "Error while updating autoscaler info configmap, invalidating to clear the cache", e); removeInfoFromCache(jobContext); throw e; } } @Override public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) { cache.remove(jobContext.getJobKey()); } private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) { AbstractFlinkResource<?, ?> cr = jobContext.getResource(); var meta = createCmObjectMeta(ResourceID.fromResource(cr)); return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta)); } private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) { LOG.info("Creating scaling info config map"); var cm = new ConfigMap(); cm.setMetadata(meta); cm.addOwnerReference(cr); cm.setData(new HashMap<>()); return kubernetesClient.resource(cm).create(); } private ObjectMeta createCmObjectMeta(ResourceID uid) { var objectMeta = new ObjectMeta(); objectMeta.setName("autoscaler-" + uid.getName()); uid.getNamespace().ifPresent(objectMeta::setNamespace); objectMeta.setLabels( Map.of( Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_AUTOSCALER, Constants.LABEL_APP_KEY, uid.getName())); return objectMeta; } private Optional<ConfigMap> getScalingInfoConfigMap(ObjectMeta objectMeta) { return Optional.ofNullable( kubernetesClient .configMaps() .inNamespace(objectMeta.getNamespace()) .withName(objectMeta.getName()) .get()); } } |
5. Compatibility, Deprecation, and Migration Plan
It must be compatible with current kubernetes operator.
6. Test Plan
UT & IT & Manually verify that the autoscaler is working as expected.
7. Rejected Alternatives
Rejected a couple of interface designs, the the whole solution is fine.