Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The proposed interface retains similarity to the existing JobAutoScaler , although with modified method parameters. Instead of using Kubernetes-related classes, the parameters will be replaced with
JobAutoScalerContext<KEY, INFO> . To enhance clarity, it is suggested to rename the generic parameter KEY  to JOB_KEY . The autoscaler will treat Flink jobs with the same jobKey as identical.

...

Code Block
/**
 * Handler all events during scaling.
 *
 * @param <KEY>
 * @param <INFO>
 */
public interface AutoScalerEventHandler<KEY, INFO> {

    void handlerScalingFailure(
            JobAutoScalerContext<KEY, INFO> context,
            FailureReason failureReason,
            String errorMessage);

    void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessage);

    void handlerRecommendedParallelism(
            JobAutoScalerContext<KEY, INFO> context, Map<String, String> recommendedParallelism);

    /** The reason of autoscaler failure. */
    enum FailureReason {
        ScalingException(true),
        IneffectiveScaling(false);

        // true indicates that the current reason is an unexpected error. False indicates that the
        // current reason is that the strategy causes this scaling to fail.
        private final boolean isError;

        FailureReason(boolean isError) {
            this.isError = isError;
        }

        public boolean isError() {
            return isError;
        }
    }
}


We can implement the CompositeEventHandler to support multiple event handles, it can records these events to multiple system.


4. AutoScalerStateStore

AutoScalerStateStore  is responsible for persist and access state during scaling.

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore  needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a HeapedAutoScalerStateStore , it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore  to persist the store in the future.

...

  1. Ensure new autoscaler module keeps the general auto scaling strategy.
    It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
    kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , AutoScalerHandler AutoScalerEventHandler  and AutoScalerStateStore  instead.

  2. Using the `RestClusterClient<String>` instead of org.apache.flink.kubernetes.operator.service.FlinkService 

    The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


  3. Implement the default and kubernetes classes for AutoScalerHandlerAutoScalerEventHandler
    The default AutoScalerHandler AutoScalerEventHandler could be the LoggedAutoScalerHandlerLoggedAutoScalerEventHandler. It just log the event when any method is called.

...

  1. .

...


Code Block
/** The kubernetes auto scaler event handler. */
public class KubernetesAutoScalerEventHandler<CR extends AbstractFlinkResource<?, ?>>
        implements AutoScalerEventHandler<ResourceID, CR> {

    private static final Logger LOG =
            LoggerFactory.getLogger(KubernetesAutoScalerEventHandler.class);

    private final KubernetesClient kubernetesClient;

    private final EventRecorder eventRecorder;

    public KubernetesAutoScalerEventHandler(
            KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.eventRecorder = eventRecorder;
    }

    @Override
    public void handlerScalingFailure(
            JobAutoScalerContext<ResourceID, CR> context,
            FailureReason failureReason,
            String errorMessage) {
        eventRecorder.triggerEvent(
                context.getExtraJobInfo(),
                failureReason.isError() ? EventRecorder.Type.Warning : EventRecorder.Type.Normal,
                failureReason.toString(),
                errorMessage,
                EventRecorder.Component.Operator);
    }

    @Override
    public void handlerScalingReport(
            JobAutoScalerContext<ResourceID, CR> context, String scalingReportMessage) {
        eventRecorder.triggerEvent(
                context.getExtraJobInfo(),
                EventRecorder.Type.Normal,
                EventRecorder.Reason.ScalingReport,
                EventRecorder.Component.Operator,
                scalingReportMessage,
                "ScalingExecutor");
    }

    @Override
    public void handlerRecommendedParallelism(
            JobAutoScalerContext<ResourceID, CR> context,
            Map<String, String> recommendedParallelism) {}
}


Alternative implementationRejected alternative implementation(It can reduce method parameters.): Create an AutoScalerEventHandler for the current job each time JobAutoScaler#scale is called, it means we change the AutoScalerEventHandler to a finer granularity. If so, we can:

...

  • The first commit: Create the flink-autoscaler module, and move non-kubernetes related autoscaler classes to flink-autoscaler module.
  • The second commit: Add the general interface for autoscaler.
  • The third commit: Remove some test classes of autoscaler due to they depend on k8s, I didn't support the unit test in this POC.(For Don't worry, for the final PR, I will do itsupport all tests)
  • The last commit: Decoupling the autosclaer and kubernetes.

...