...
[This FLIP proposal is a joint work between Rui Fan and Samrat Deb ]
Table of Contents
Motivation
FLIP-271: Autoscaling is a great proposal. Ideally, users only need to enable autoscaler for flink jobs without configuring parallelism, it is very helpful for the ease of use of flink. However flink-autoscaler is in the flink-kubernetes-operator repo and is coupled with kubernetes.
...
Code Block |
---|
/** The kubernetes auto scaler state store. */ public class KubernetesAutoScalerStateStore implements AutoScalerStateStore { private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class); private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler"; private final KubernetesClient kubernetesClient; private final ConfigMap configMap; public KubernetesAutoScalerStateStore( AbstractFlinkResource<?, ?> cr, KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient; this.configMap = getConfigMap(cr, kubernetesClient); } public static ConfigMap getConfigMap( AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) { var objectMeta = new ObjectMeta(); objectMeta.setName("autoscaler-" + cr.getMetadata().getName()); objectMeta.setNamespace(cr.getMetadata().getNamespace()); return getScalingInfoConfigMapFromKube(objectMeta, kubeClient) .orElseGet( () -> { LOG.info("Creating scaling info config map"); objectMeta.setLabels( Map.of( Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_AUTOSCALER, Constants.LABEL_APP_KEY, cr.getMetadata().getName())); var cm = new ConfigMap(); cm.setMetadata(objectMeta); cm.addOwnerReference(cr); cm.setData(new HashMap<>()); return kubeClient.resource(cm).create(); }); } private static Optional<ConfigMap> getScalingInfoConfigMapFromKube( ObjectMeta objectMeta, KubernetesClient kubeClient) { return Optional.ofNullable( kubeClient .configMaps() .inNamespace(objectMeta.getNamespace()) .withName(objectMeta.getName()) .get()); } @Override public Optional<String> get(String key) { return Optional.ofNullable(configMap.getData().get(key)); } @Override public void put(String key, String value) { configMap.getData().put(key, value); } @Override public void remove(String key) { configMap.getData().remove(key); } @Override public void flush() { kubernetesClient.resource(configMap).update(); } } |
POC
I have finished the POC for FLIP-334, here is the POC branch. This branch has 3 commits:
- The first commit: Remove some test classes of autoscaler due to they depend on k8s, I didn't support the unit test in this POC.(For the final PR, I will do it)
- The second commit: Decoupling the autosclaer and kubernetes (this commit is core change, it includes all changes about this FLIP)
- The third commit: Rename the module name from flink-kebernetes-operator-autosclaer to flink-autoscaler
You only need to look at the second commit.
The flink-autoscaler module doesn't depend on any kubernetes dependencies in this POC. I have tested, it works well with kubernetes. You can run the flink-kubernetes-operator locally based on the flink-kubernetes-operator-docs and run the autoscaling example.
BTW, I didn't support yarn in this POC, Samrat Deb would like to support it after decoupling flink-autoscaler and kubernetes.
Compatibility, Deprecation, and Migration Plan
...