Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between  Rui Fan and Samrat Deb  ]

Table of Contents

Motivation

FLIP-271: Autoscaling is a great proposal. Ideally, users only need to enable autoscaler for flink jobs without configuring parallelism, it is very helpful for the ease of use of flink. However flink-autoscaler is in the flink-kubernetes-operator repo and is coupled with kubernetes.

...

Code Block
/** The kubernetes auto scaler state store. */
public class KubernetesAutoScalerStateStore implements AutoScalerStateStore {

    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";

    private final KubernetesClient kubernetesClient;

    private final ConfigMap configMap;

    public KubernetesAutoScalerStateStore(
            AbstractFlinkResource<?, ?> cr, KubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
        this.configMap = getConfigMap(cr, kubernetesClient);
    }

    public static ConfigMap getConfigMap(
            AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {

        var objectMeta = new ObjectMeta();
        objectMeta.setName("autoscaler-" + cr.getMetadata().getName());
        objectMeta.setNamespace(cr.getMetadata().getNamespace());

        return getScalingInfoConfigMapFromKube(objectMeta, kubeClient)
                .orElseGet(
                        () -> {
                            LOG.info("Creating scaling info config map");

                            objectMeta.setLabels(
                                    Map.of(
                                            Constants.LABEL_COMPONENT_KEY,
                                            LABEL_COMPONENT_AUTOSCALER,
                                            Constants.LABEL_APP_KEY,
                                            cr.getMetadata().getName()));
                            var cm = new ConfigMap();
                            cm.setMetadata(objectMeta);
                            cm.addOwnerReference(cr);
                            cm.setData(new HashMap<>());
                            return kubeClient.resource(cm).create();
                        });
    }

    private static Optional<ConfigMap> getScalingInfoConfigMapFromKube(
            ObjectMeta objectMeta, KubernetesClient kubeClient) {
        return Optional.ofNullable(
                kubeClient
                        .configMaps()
                        .inNamespace(objectMeta.getNamespace())
                        .withName(objectMeta.getName())
                        .get());
    }

    @Override
    public Optional<String> get(String key) {
        return Optional.ofNullable(configMap.getData().get(key));
    }

    @Override
    public void put(String key, String value) {
        configMap.getData().put(key, value);
    }

    @Override
    public void remove(String key) {
        configMap.getData().remove(key);
    }

    @Override
    public void flush() {
        kubernetesClient.resource(configMap).update();
    }
}



POC

I have finished the POC for FLIP-334, here is the POC branch. This branch has 3 commits:

  • The first commit: Remove some test classes of autoscaler due to they depend on k8s, I didn't support the unit test in this POC.(For the final PR, I will do it)
  • The second commit: Decoupling the autosclaer and kubernetes (this commit is core change, it includes all changes about this FLIP)
  • The third commit: Rename the module name from flink-kebernetes-operator-autosclaer  to flink-autoscaler

You only need to look at the second commit.


The flink-autoscaler module doesn't depend on any kubernetes dependencies in this POC. I have tested, it works well with kubernetes. You can run the flink-kubernetes-operator locally based on the flink-kubernetes-operator-docs and run the autoscaling example.

BTW, I didn't support yarn in this POC, Samrat Deb  would like to support it after decoupling flink-autoscaler and kubernetes.

Compatibility, Deprecation, and Migration Plan

...