You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadPrepare the FLIP
Vote thread
JIRA
Release

[This FLIP proposal is a joint work between  Rui Fan and Samrat Deb  ]

Motivation

FLIP-271: Autoscaling is a great proposal. Ideally, users only need to enable autoscaler for flink jobs without configuring parallelism, it is very helpful for the ease of use of flink. However flink-autoscaler is in the flink-kubernetes-operator repo and is coupled with kubernetes.

For some reasons, the flink jobs of flink on yarn also need to use flink-autoscaler:

  • After Externalized Declarative Resource Management (FLIP-291, FLINK-31316) is merged, all types of Flink jobs can support in-place scaling. It means flink on yarn is easy to scaling now.
  • Some flink users discussed in the mail list that flink-autoscaler should support flink on yarn.

This FLIP focus on decoupling autoscaler and kubernetes.

Core idea

  • flink-autoscaler is a common autoscaler module
    • [Must-to-have] It includes the general autoscaler strategy
      • JobAutoScaler and the general JobAutoScalerImpl
    • [Must-to-have] Define the Interface, such as: AutoScalerHandler and AutoScalerStateStore
      • AutoScalerHandler: defining the event handler interface.
      • AutoScalerStateStore: defining the state store interface for accessing and persisting state.
    • [Must-to-have] It should not depend on any k8s related dependencies, including: fabric8, flink-kubernetes-operator and flink-kubernetes.
    • [Must-to-have] It can depend on apache/flink project, because it must depend on some classes, such as: JobVertexID, Configuration, and MetricGroup, etc.
    • [Nice-to-have] Move the flink-autoscaler to flink repo
      • It's easy to implement flink-yarn-sutoscaler after decoupling. And adding flink-yarn-sutoscaler to flink-kubernetes-operator repo is wired.
      • I prefer keeping this module in flink-kubernetes-operator repo during this FLIP, and we can move it in the last step of this FLIP.
  • flink-kubernetes-operator-autoscaler  is a autoscaler module for flink on kubernetes
    • [Must-to-have] Implement the AutoScalerHandler and AutoScalerStateStore
      • AutoScalerHandler calls kubernetes do the real scaling or records some events.
      • AutoScalerStateStore calls kubernetes to query and update the ConfigMap

Note: the independent `flink-kubernetes-operator-autoscaler` module isn't necessary, moving these classes to flink-kubernetes-operator  will reduce complexity. We can discuss it in the mail list.

Why it isn't necessary?

  • In the POC version, `flink-kubernetes-operator-autoscaler` just defines 2 class, they are: KubernetesAutoScalerHandler and KubernetesAutoScalerStateStore.
  • If `flink-kubernetes-operator-autoscaler` as an independent module, it must depend on `flink-kubernetes-operator` module.
  • `flink-kubernetes-operator` cannot depend on `flink-kubernetes-operator-autoscaler, so it's more difficult to load these classes than remove the `flink-kubernetes-operator-autoscaler` module.

Public Interfaces

JobAutoScaler

Its interfaces are similar to the current JobAutoScaler, however the method parameters are changed. Changing these parameters from classes related to kubernetes to JobAutoScalerContext<KEY, INFO> and KEY jobKey.

It may be more reasonable to change the generic KEY to JOB_KEY, the auto sclaer will consider them as the same flink job when the jobKey is the same.

The generic INFO will be introduced later.

/** The general Autoscaler instance. */
public interface JobAutoScaler<KEY, INFO> {

    /** Called as part of the reconciliation loop. Returns true if this call led to scaling. */
    boolean scale(JobAutoScalerContext<KEY, INFO> context);

    /** Called when the custom resource is deleted. */
    void cleanup(KEY jobKey);
}


JobAutoScalerContext

JobAutoScalerContext includes the flink job information that needed during scaling, such as: jobKey, jobId, stateStore and `INFO extraJobInfo`, etc.

For current code or kubernetes job, the jobKey is `io.javaoperatorsdk.operator.processing.event.ResourceID`. We can define the jobKey of yarn flink job in the future.
For the `INFO extraJobInfo`, the flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements of AutoScalerHandler. This whole context will be passed to these implements when the autoscaler callbacks them.


/**
 * The job autoscaler context.
 *
 * @param <KEY>
 * @param <INFO>
 */
@AllArgsConstructor
public class JobAutoScalerContext<KEY, INFO> {

    // The identifier of each flink job.
    @Getter private final KEY jobKey;

    @Getter private final JobID jobID;

    @Getter private final long jobVersion;

    // Whether the job is really running, the STARTING or CANCELING aren't running.
    @Getter private final boolean isRunning;

    @Getter private final Configuration conf;

    @Getter private final MetricGroup metricGroup;

    private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

    @Getter private final Duration flinkClientTimeout;

    @Getter private final AutoScalerStateStore stateStore;

    /**
     * The flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements of AutoScalerHandler. This
     * whole context will be passed to these implements when the autoscaler callbacks them.
     */
    @Getter @Nullable private final INFO extraJobInfo;

    public RestClusterClient<String> getRestClusterClient() throws Exception {
        return restClientSupplier.get();
    }
}


AutoScalerHandler

AutoScalerHandler will be called by auto scaler when some cases need to be handle, such as: scaling error, report scaling result and update flink job based on the recommended parallelism.

For current code or kubernetes job, most of handlers will record event, all of handlers needs the `AbstractFlinkResource<?, ?>`, it's saved at `INFO extraJobInfo` of JobAutoScalerContext.
The `AutoScalerHandler` object is shared for all flink jobs, it doesn't have the job information. However, it needs the `AbstractFlinkResource<?, ?>` of every job, that's why adding the `INFO extraJobInfo` to the JobAutoScalerContext.


/**
 * Handler all events during scaling.
 *
 * @param <KEY>
 * @param <INFO>
 */
public interface AutoScalerHandler<KEY, INFO> {

    void handlerScalingError(JobAutoScalerContext<KEY, INFO> context, String errorMessage);

    void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessage);

    void handlerIneffectiveScaling(JobAutoScalerContext<KEY, INFO> context, String message);

    void handlerRecommendedParallelism(
            JobAutoScalerContext<KEY, INFO> context,
            HashMap<String, String> recommendedParallelism);
}


AutoScalerStateStore

AutoScalerStateStore is responsible for persist and access state during scaling.

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a `HeapedAutoScalerStateStore`, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.


public interface AutoScalerStateStore {

    Optional<String> get(String key);

    // Put the state to state store, please flush the state store to prevent the state lost.
    void put(String key, String value);

    void remove(String key);

    void flush();
}



Proposed Changes

Ensure the autoscaler module just keeping the general auto scaling strategy.

It includes JobAutoScalerImpl, ScalingMetricCollector, AutoScalerInfo and ScalingExecutor, etc.

We should remove the kubernetes related dependencies from these classes and use JobAutoScalerContext, AutoScalerHandler and AutoScalerStateStore instead.


Using the RestClusterClient<String> instead of org.apache.flink.kubernetes.operator.service.FlinkService

The FlinkService is related to kubernetes, so we shouldn't use it. The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.

The RestClusterClient<String> is included in JobAutoScalerContext.

Implement the default and kubernetes classes for AutoScalerHandler

The default AutoScalerHandler could be the LoggedAutoScalerHandler. It just log the event when any method is called.


For current code or kubernetes job, most of handlers will record event, all of handlers needs the `AbstractFlinkResource<?, ?>`, it's saved at `INFO extraJobInfo` of JobAutoScalerContext.
The `AutoScalerHandler` object is shared for all flink jobs, it doesn't have the job information. However, it needs the `AbstractFlinkResource<?, ?>` of every job, that's why adding the `INFO extraJobInfo` to the JobAutoScalerContext. 


/**
 * The kubenetes auto scaler handler.
 *
 * @param <CR>
 */
public class KubernetesAutoScalerHandler<CR extends AbstractFlinkResource<?, ?>>
        implements AutoScalerHandler<ResourceID, CR> {

    private final KubernetesClient kubernetesClient;

    private final EventRecorder eventRecorder;

    public KubernetesAutoScalerHandler(
            KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.eventRecorder = eventRecorder;
    }

    @Override
    public void handlerScalingError(
            JobAutoScalerContext<ResourceID, CR> context, String errorMessage) {
        eventRecorder.triggerEvent(
                context.getExtraJobInfo(),
                EventRecorder.Type.Warning,
                EventRecorder.Reason.AutoscalerError,
                EventRecorder.Component.Operator,
                errorMessage);
    }

    @Override
    public void handlerScalingReport(
            JobAutoScalerContext<ResourceID, CR> context, String scalingReportMessage) {
        eventRecorder.triggerEvent(
                context.getExtraJobInfo(),
                EventRecorder.Type.Normal,
                EventRecorder.Reason.ScalingReport,
                EventRecorder.Component.Operator,
                scalingReportMessage,
                "ScalingExecutor");
    }

    @Override
    public void handlerIneffectiveScaling(
            JobAutoScalerContext<ResourceID, CR> context, String message) {
        eventRecorder.triggerEvent(
                context.getExtraJobInfo(),
                EventRecorder.Type.Normal,
                EventRecorder.Reason.IneffectiveScaling,
                EventRecorder.Component.Operator,
                message);
    }

    @Override
    public void handlerRecommendedParallelism(
            JobAutoScalerContext<ResourceID, CR> context,
            HashMap<String, String> recommendedParallelism) {
        AbstractFlinkResource<?, ?> resource = context.getExtraJobInfo();
        var flinkConf = Configuration.fromMap(resource.getSpec().getFlinkConfiguration());
        flinkConf.set(PARALLELISM_OVERRIDES, recommendedParallelism);
        resource.getSpec().setFlinkConfiguration(flinkConf.toMap());

        KubernetesClientUtils.applyToStoredCr(
                kubernetesClient,
                resource,
                stored ->
                        stored.getSpec()
                                .setFlinkConfiguration(resource.getSpec().getFlinkConfiguration()));
    }
}


Alternative implementation: Create an AutoScalerHandler for the current job each time JobAutoScaler#scale is called, it means we change the AutoScalerHandler to a finer granularity. If so, we can:

  • Adding the AutoScalerHandler into the JobAutoScalerContext
  • And adding the JobAutoScalerContext  into the AutoScalerHandler
  • All hander methods don't need the JobAutoScalerContext, because it has includes the JobAutoScalerContext

Implement the default and kubernetes classes for AutoScalerStateStore

The default AutoScalerStateStore is the HeapedAutoScalerStateStore, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.

It's pluggable, so any database can be supportted. Flink users can implement other AutoScalerStateStore in their scenarios.


For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

/** The kubernetes auto scaler state store. */
public class KubernetesAutoScalerStateStore implements AutoScalerStateStore {

    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

    private static final String LABEL_COMPONENT_AUTOSCALER = "autoscaler";

    private final KubernetesClient kubernetesClient;

    private final ConfigMap configMap;

    public KubernetesAutoScalerStateStore(
            AbstractFlinkResource<?, ?> cr, KubernetesClient kubernetesClient) {
        this.kubernetesClient = kubernetesClient;
        this.configMap = getConfigMap(cr, kubernetesClient);
    }

    public static ConfigMap getConfigMap(
            AbstractFlinkResource<?, ?> cr, KubernetesClient kubeClient) {

        var objectMeta = new ObjectMeta();
        objectMeta.setName("autoscaler-" + cr.getMetadata().getName());
        objectMeta.setNamespace(cr.getMetadata().getNamespace());

        return getScalingInfoConfigMapFromKube(objectMeta, kubeClient)
                .orElseGet(
                        () -> {
                            LOG.info("Creating scaling info config map");

                            objectMeta.setLabels(
                                    Map.of(
                                            Constants.LABEL_COMPONENT_KEY,
                                            LABEL_COMPONENT_AUTOSCALER,
                                            Constants.LABEL_APP_KEY,
                                            cr.getMetadata().getName()));
                            var cm = new ConfigMap();
                            cm.setMetadata(objectMeta);
                            cm.addOwnerReference(cr);
                            cm.setData(new HashMap<>());
                            return kubeClient.resource(cm).create();
                        });
    }

    private static Optional<ConfigMap> getScalingInfoConfigMapFromKube(
            ObjectMeta objectMeta, KubernetesClient kubeClient) {
        return Optional.ofNullable(
                kubeClient
                        .configMaps()
                        .inNamespace(objectMeta.getNamespace())
                        .withName(objectMeta.getName())
                        .get());
    }

    @Override
    public Optional<String> get(String key) {
        return Optional.ofNullable(configMap.getData().get(key));
    }

    @Override
    public void put(String key, String value) {
        configMap.getData().put(key, value);
    }

    @Override
    public void remove(String key) {
        configMap.getData().remove(key);
    }

    @Override
    public void flush() {
        kubernetesClient.resource(configMap).update();
    }
}



Compatibility, Deprecation, and Migration Plan

It must be compatible with current kubernetes operator.

Whether

  • [Nice-to-have] Move the flink-autoscaler to flink repo
    • It's easy to implement flink-yarn-sutoscaler after decoupling. And adding flink-yarn-sutoscaler to flink-kubernetes-operator repo is wired.
    • I prefer keeping this module in flink-kubernetes-operator repo during this FLIP, and we can move it in the last step of this FLIP.


Test Plan

UT & IT & Manually verify that the autoscaler is working as expected.

Rejected Alternatives


  • No labels