Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

[This FLIP proposal is a joint work between  Rui Fan and Samrat Deb  , and did some consulting with Gyula Fora Maximilian Michels  ]

Table of Contents

...

1. Motivation

The proposal to introduce autoscaling for Flink (FLIP-271) has garnered significant interest due to its potential to greatly enhance the usability of Flink. The primary objective is to enable users to effortlessly enable the autoscaler for their Flink jobs without the need for intricate parallelism configurations. However, the current implementation of the flink-autoscaler is tightly integrated with Kubernetes and resides within the flink-kubernetes-operator repository.

...

Hence, this FLIP is centered around the crucial task of decoupling the autoscaler functionality from Kubernetes.
By achieving this decoupling, we aim to empower Flink users to leverage the benefits of autoscaling irrespective of their chosen deployment platform, whether it be Kubernetes or YARN.

2. Core Idea

  1. JobAutoScaler and JobAutoScalerImpl: These components will define the general generic autoscaling strategy for Flink jobs and are essential for the implementation of the autoscaler module.

  2. Interfaces: The FLIP outlines the necessity of defining a few interfaces - ScalingRealizer, AutoScalerEventHandler, AutoScalerStateStore and AutoScalerStateStoreFactory and AutoScalerStateStore.
    1. The ScalingRealizer interface handles scaling action.
    2. The AutoScalerEventHandler  interface handles
    event-based operations, while the
    1. loggable events.
    2. The AutoScalerStateStore  interface is responsible for accessing and persisting the autoscaler's state.

  3. Dependencies: The autoscaler module should not rely on any Kubernetes-related dependencies such as fabric8, flink-kubernetes-operator, or flink-kubernetes.
    Instead, it can rely on Apache Flink project dependencies to gather metrics and make scaling decisions based on JobVertexID , Configuration , MetricGroup , and other relevant classes.

  4. Optional Goal: For flink-autoscaler module, we prefer stay it at flink-k8s-operator first(Advance discussion with Gyula Fora).

        a. Since the autoscaler not only supports the latest flink version, so it may not be a good choice moving it to flink repo. It needs to work across Flink versions this probably have to be a separate repo/subproject. 

        b. Remove the autoscaler from operator repo will also bring a lot of operational overhead / release process etc which will be a negative from the operator’s perspective.

        c. So, we prefer stay it and flink-yarn-autoscaler as a submodule of the flink-kubernetes-operator in the short term. If it's stable in the future, we can discuss move them. After it's stable, the release frequency will be reduced, and it will be small negative for the operator.


Note:
Independent flink-kubernetes-operator-autoscaler  module is not necessary. Moving classes to flink-kubernetes-operator  will reduce complexity. We can discuss it in the mail list.

Why it isn't necessary?

...

  • If flink-kubernetes-operator-autoscaler  as an independent module, it must depend on flink-kubernetes-operator  module.
  • flink-kubernetes-operator  cannot depend on flink-kubernetes-operator-autoscaler , so it's more difficult to load these classes than remove the flink-kubernetes-operator-autoscaler  module.
  • flink-kubernetes-operator-autoscaler  just defines KubernetesScalingRealizer, KubernetesAutoScalerEventHandler and KubernetesAutoScalerStateStore.
  • There are not many of these classes, so moving them to flink-kubernetes-operator  will reduce complexity. 

3. Public Interfaces

3.1

...

JobAutoScaler

The proposed interface retains similarity to the existing JobAutoScaler , although with modified method parameters. Instead of using Kubernetes-related classes, the parameters will be replaced with JobAutoScalerContext<KEY, INFO> . To enhance clarity, it is suggested to rename the generic parameter KEY  to JOB_KEY . The autoscaler will treat Flink jobs with the same jobKey as identical.The generic INFO will be introduced later JobAutoScalerContext<KEY> .  The KEY is jobKey, if the KEY is the same, it is considered to be the same flink job.

Code Block
/** The generalgeneric Autoscaler instance. */
public interface JobAutoScaler<KEY, INFO> {JobAutoScaler<KEY> {

    boolean scale(JobAutoScalerContext<KEY> context);

    /** Called aswhen partthe ofjob theis reconciliation loop. Returns true if this call led to scaling. */
    boolean scale(JobAutoScalerContext<KEY, INFO> context);

    /** Called when the job is deleted. deleted. */
    void cleanup(JobAutoScalerContext<KEY, INFO>JobAutoScalerContext<KEY> context);

    /** Get the current parallelism overrides for the job. */
    Map<String, String> getParallelismOverrides(JobAutoScalerContext<KEY, INFO> context);
}

...

}

3.2 JobAutoScalerContext

The JobAutoScalerContext  encapsulates essential information required for scaling Flink jobs, including jobKey, jobId, stateStore, and INFO extraJobInfoconfiguration and MetricGroup, etc.
Currently, in the existing code or for Kubernetes jobs, the jobKey is defined as io.javaoperatorsdk.operator.processing.event.ResourceID .
However, there is a possibility to define the jobKey for Flink jobs running on YARN in the future.


Regarding the INFO extraJobInfo, it is worth noting that the flink-autoscaler itself does not utilize this information. Instead, it is employed by certain implementations of the AutoScalerEventHandler.
The entire JobAutoScalerContext, comprising The entire JobAutoScalerContext, comprising all relevant details, will be passed to these implementations when the autoscaler invokes their respective callbacks.

Code Block
/**
 * The job autoscaler context.
 *
 * @param <KEY>
 *The @paramjob <INFO>key.
 */
@AllArgsConstructor
 public class JobAutoScalerContext<KEY, INFO>interface JobAutoScalerContext<KEY> {
 
    // The identifier of each flink job.
    @Getter private final KEY jobKeygetJobKey();
 
    @Getter private final JobID jobIDJobID getJobID();

  
  // Whether the job is really running, the STARTING or CANCELING aren't running.Configuration getConfiguration();
 
    @Getter private final boolean isRunningMetricGroup getMetricGroup();
 
    @GetterRestClusterClient<String> privategetRestClusterClient() finalthrows Configuration confException;

 }

3.3 ScalingRealizer

The ScalingRealizer interface handles scaling action, and the ScalingRealizer#realize  will be called inside of the JobAutoscaler#scale .

Code Block
public interface ScalingRealizer<KEY,
   @Getter    privateContext finalextends MetricGroupJobAutoScalerContext<KEY>> metricGroup;{

   void private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

realize(Context context,
      @Getter private final Duration flinkClientTimeout;

     private finalMap<String, AutoScalerStateStoreFactoryString> parallelismOverrides);
}


3.4 AutoScalerEventHandler


The AutoScalerEventHandler  interface handles loggable events, and AutoScalerEventHandler#handleEvent will be called by auto scaler when some loggable events need to be handled, such as: scaling error, report scaling result, etc.

The AutoScalerEventHandler  object is shared for all flink jobs, it doesn't have the job information, that's why the JobAutoScalerContext as the parameter of handleEvent.


Code Block
/**
 * Handler all loggable events during scaling.
 *
 * @param <KEY>
 */
public interface AutoScalerEventHandler<KEY,
       Context extends JobAutoScalerContext<KEY>> {

   void handleEvent(Context context,
            stateStoreFactory;

    /**
     * The flink-autoscaler doesn't use the extraJobInfo, it is only used in some implements. This
     * whole context will be passed to these implements when the autoscaler callbacks them.
     */
    @Getter @Nullable private final INFO extraJobInfo;

    public RestClusterClient<String> getRestClusterClient() throws Exception {
        return restClientSupplier.get();
    }

    public Optional<AutoScalerStateStore> getStateStore() {
        return stateStoreFactory.get();
Type type,
     }

    public AutoScalerStateStore getOrCreateStateStore() {
        return stateStoreFactory.getOrCreate();
    }
}

3. AutoScalerEventHandler

String reason,
                    String message,
                    @Nullable String messageKey);

   enum Type {
       Normal,
       Warning
   }
}


3.5 AutoScalerStateStore

AutoScalerStateStore  is responsible for persist and access state during scalingAutoScalerEventHandler  will be called by auto scaler when some cases need to be handle, such as: scaling error, report scaling result and update flink job based on the recommended parallelism.

For current code or kubernetes job, most of handlers will record event, all of handlers needs the AbstractFlinkResource , it's saved at INFO extraJobInfo  of JobAutoScalerContext .
The AutoScalerEventHandler  object is shared for all flink jobs, it doesn't have the job information. However, it needs the AbstractFlinkResource  of every job, that's why adding the INFO extraJobInfo  to the JobAutoScalerContext the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore  needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.
For other jobs(yarn or standalone), I implement a HeapedAutoScalerStateStore , it means the state will be lost after autoscaler restart. Of course, we can implement JdbcAutoScalerStateStore  to persist the store in the future.


Code Block
/**
 * Handler all events It will be used store state during scaling.
 */
public * @param <KEY>interface AutoScalerStateStore<KEY,
 * @param   <INFO>
 */
public interfaceContext AutoScalerEventHandler<KEY,extends INFO>JobAutoScalerContext<KEY>> {

    void handlerScalingFailurestoreScalingHistory(
Context jobContext, String decision);

   String getScalingHistory(Context jobContext);

   void JobAutoScalerContext<KEY, INFO> context,removeScalingHistory(Context jobContext);

   void storeEvaluatedMetrics(Context jobContext, String metrics);

   String  FailureReason failureReason,getEvaluatedMetrics(Context jobContext);

   void removeEvaluatedMetrics(Context jobContext);

   void storeParallelismOverrides(Context  jobContext, String errorMessageparallelismOverrides);

   String void handlerScalingReport(JobAutoScalerContext<KEY, INFO> context, String scalingReportMessagegetParallelismOverrides(Context jobContext);

    void handlerRecommendedParallelism(
removeParallelismOverrides(Context jobContext);


   void removeInfoFromCache(Context jobContext);

   // The flush is JobAutoScalerContext<KEY,needed INFO>because context,we Map<String,just String> recommendedParallelism);

    /** The reason of autoscaler failure. */
    enum FailureReason {
        ScalingException(true),
        IneffectiveScaling(false);

        // true indicates that the current reason is an unexpected error. False indicates that the
        // current reason is that the strategy causes this scaling to fail.
        private final boolean isError;

        FailureReason(boolean isError) {
            this.isError = isError;
        }

        public boolean isError() {
            return isError;
        }
    }
}

We can implement the CompositeEventHandler to support multiple event handles, it can records these events to multiple system.

4. AutoScalerStateStore

...

Code Block
/** It will be used store state during scaling. */
public interface AutoScalerStateStore {

    Optional<String> get(String key);

    // Put the state to state store, please flush the state store to prevent the state lost.
    void put(String key, String value);

    void remove(String key);

    void flush();

    /**
     * The state store cannot be used if the state store isn't valid. Please create a new state
     * store by {@link AutoScalerStateStoreFactory}.
     */
    boolean isValid();
}

5. AutoScalerStateStoreFactory

AutoScalerStateStoreFactory  is responsible for get or create the AutoScalerStateStore for each job.

Currently, the AutoScalerStateStore is maintained by AutoscalerInfoManager to reduce the access with kubernetes. If the state store isn't vaild, AutoscalerInfoManager will get or create a new AutoScalerStateStore.

AutoScalerStateStoreFactory will retrieve the state from physical state store(such as: kuberntes configmap) and create the AutoScalerStateStore.

Code Block
/** The factory for auto scaler state store. */
public interface AutoScalerStateStoreFactory {

    Optional<AutoScalerStateStore> get();

    AutoScalerStateStore getOrCreate();
}

6. The general autoscaler

As discussed with Gyula Fora  and Samrat Deb , we don't implement the yarn autoscaler at this FLIP, we can just implement a general autoscaler based on rescale api(FLIP-291). The general autoscaler doesn't know any job, and users can pass the JobAutoScalerContext to using the general autoscaler. It can communicate with flink job through RestClusterClient.

Proposed Changes

...

The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is general flink client, it supports all flink types, including: kubernetes, yarn, standalone.
The RestClusterClient<String> is included in JobAutoScalerContext.

...

save data in cache for all store methods, and flush these data to the physical storage after the flush is called to improve the performance 
   void flush(Context jobContext);
}


3.6 The generic autoscaler

As discussed with Gyula Fora  and Samrat Deb , we don't implement the yarn autoscaler at this FLIP, we can just implement a generic autoscaler based on rescale api(FLIP-291). The generic autoscaler doesn't know any job, and users can pass the JobAutoScalerContext to using the generic autoscaler. It can communicate with flink job through RestClusterClient.


4. Proposed Changes

4.1 Ensure new autoscaler module keeps the generic auto scaling strategy.

It includes JobAutoScalerImpl , ScalingMetricCollector , AutoScalerInfo, ScalingExecutor  etc.
kubernetes related dependencies should be removed from these classes and use JobAutoScalerContext , ScalingRealizer , AutoScalerEventHandler  and AutoScalerStateStore  instead.

Using the RestClusterClient  instead of org.apache.flink.kubernetes.operator.service.FlinkService 

  • The FlinkService is related to kubernetes, so we shouldn't use it.The RestClusterClient is generic flink client, it supports all flink types, including: kubernetes, yarn, standalone.
    The RestClusterClient<String> is included in JobAutoScalerContext.


4.2 KubernetesJobAutoScalerContext

Note: some code can be extracted into a AbstractJobAutoScalerContext, such as: jobKey, jobId, configuration, metric group and restClieentSupplier.

These logic should be generic for k8s, yarn and standalone.

Code Block
public class KubernetesJobAutoScalerContext implements JobAutoScalerContext<ResourceID> {

   private final ResourceID resourceID;

   private final JobID jobID;

   private final Configuration configuration;

   private final MetricGroup metricGroup;

   private final SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier;

   private final AbstractFlinkResource<?, ?> resource;

   public KubernetesJobAutoScalerContext(
           ResourceID resourceID,
           JobID jobID,
           Configuration configuration,
           MetricGroup metricGroup,
           SupplierWithException<RestClusterClient<String>, Exception> restClientSupplier,
           AbstractFlinkResource<?, ?> resource) {
       this.resourceID = resourceID;
       this.jobID = jobID;
       this.configuration = configuration;
       this.metricGroup = metricGroup;
       this.restClientSupplier = restClientSupplier;
       this.resource = resource;
   }

   @Override
   public ResourceID getJobKey() {
       return resourceID;
   }

   @Override
   public JobID getJobID() {
       return jobID;
   }

   @Override
   public Configuration getConfiguration() {
       return configuration;
   }

   @Override
   public MetricGroup getMetricGroup() {
       return metricGroup;
   }

   @Override
   public RestClusterClient<String> getRestClusterClient() throws Exception {
       return restClientSupplier.get();
   }

   public AbstractFlinkResource<?, ?> getResource() {
       return resource;
   }
}


4.3 KubernetesScalingRealizer

Code Block
public class KubernetesScalingRealizer
       implements ScalingRealizer<ResourceID, KubernetesJobAutoScalerContext> {
   @Override
   public void realize(
           KubernetesJobAutoScalerContext context,
           Map<String, String> parallelismOverrides) {
       spec.getFlinkConfiguration().put(
       PipelineOptions.PARALLELISM_OVERRIDES.key(),
       ConfigurationUtils.convertValue(parallelismOverrides, String.class));
       context.getResource().getStatus().setImmediateReconciliationNeeded(true);
   }
}


4.4 KubernetesAutoScalerEventHandler

Code Block
public class KubernetesAutoScalerEventHandler
       implements AutoScalerEventHandler<ResourceID, KubernetesJobAutoScalerContext> {

   private final EventRecorder eventRecorder;

   public KubernetesAutoScalerEventHandler(EventRecorder eventRecorder) {
Code Block
/** The kubernetes auto scaler event handler. */
public class KubernetesAutoScalerEventHandler<CR extends AbstractFlinkResource<?, ?>>
        implements AutoScalerEventHandler<ResourceID, CR> {

    private static final Logger LOG =
            LoggerFactory.getLogger(KubernetesAutoScalerEventHandler.class);

    private final KubernetesClient kubernetesClient;

    private final EventRecorder eventRecorder;

    public KubernetesAutoScalerEventHandler(
            KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.eventRecorder = eventRecorder;
    }

    @Override
    public void handlerScalingFailurehandleEvent(KubernetesJobAutoScalerContext context,
            JobAutoScalerContext<ResourceID,  CR> context,
            FailureReasonType failureReasontype,
              String errorMessage) {
              eventRecorder.triggerEvent(String reason,
                context.getExtraJobInfo(),
                failureReason.isError() ? EventRecorder.Type.Warning : EventRecorder.Type.Normal,
String message,
                     failureReason.toString(),
      @Nullable String messageKey) {
       errorMessage,eventRecorder.triggerEvent(
                EventRecorder.Component.Operator);context.getResource(),
    }

    @Override
    public void handlerScalingReport(
  EventRecorder.Type.valueOf(type.name()),
             JobAutoScalerContext<ResourceID, CR> context, String scalingReportMessage) {reason,
        eventRecorder.triggerEvent(
                context.getExtraJobInfo()EventRecorder.Component.Operator,
                EventRecorder.Type.Normalmessage,
                EventRecorder.Reason.ScalingReport,messageKey);
   }

}


4.5 KubernetesAutoScalerStateStore

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.


Code Block
/** The kubernetes auto scaler state store, it's based on the config  EventRecorder.Component.Operator,
    map. */ public class KubernetesAutoScalerStateStore
       implements AutoScalerStateStore<ResourceID, KubernetesJobAutoScalerContext> {

  scalingReportMessage,
 private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

   private static final String LABEL_COMPONENT_AUTOSCALER = "ScalingExecutorautoscaler");
private static final String COLLECTED_METRICS_KEY = }

    @Override
    public void handlerRecommendedParallelism(
            JobAutoScalerContext<ResourceID, CR> context,
      "collectedMetrics";
private static final String SCALING_HISTORY_KEY = "scalingHistory";
private static final String PARALLELISM_OVERRIDES_KEY = "parallelismOverrides";

   private final KubernetesClient kubernetesClient;

   private final ConcurrentHashMap<ResourceID, ConfigMap> cache = new ConcurrentHashMap<>();

   public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient) {
      Map<String, String> recommendedParallelism) {}
}

Rejected alternative implementation(It can reduce method parameters.): Create an AutoScalerEventHandler for the current job each time JobAutoScaler#scale is called, it means we change the AutoScalerEventHandler to a finer granularity. If so, we can:

  • Adding the AutoScalerEventHandler into the JobAutoScalerContext
  • And adding the JobAutoScalerContext  into the AutoScalerEventHandler
  • All hander methods don't need the JobAutoScalerContext, because it has includes the JobAutoScalerContext

Implement the default and kubernetes classes for AutoScalerStateStore

The default AutoScalerStateStore is the HeapedAutoScalerStateStore, it means the state will be lost after autoscaler restart. Of course, we can implement MySQLAutoScalerStateStore to persist the store in the future.

It's pluggable, so any database or state store can be supported. Flink users can implement other AutoScalerStateStore in their scenarios.

For current code or kubernetes job, the state is persisted to ConfigMap. So the KubernetesAutoScalerStateStore needs to fetch ConfigMap before scaling, and persist the ConfigMap after scaling.

Code Block
/** The kubernetes auto scaler state store, it's based on the config map. */
public class KubernetesAutoScalerStateStore implements AutoScalerStateStore {

    private static final Logger LOG = LoggerFactory.getLogger(KubernetesAutoScalerStateStore.class);

    private final KubernetesClient kubernetesClient;

    private ConfigMap configMap;

    public KubernetesAutoScalerStateStore(KubernetesClient kubernetesClient, ConfigMap configMap this.kubernetesClient = kubernetesClient;
   }

   // COLLECTED_METRICS_KEY and PARALLELISM_OVERRIDES_KEY is similar to SCALING_HISTORY_KEY
   @Override
   public void storeScalingHistory(KubernetesJobAutoScalerContext jobContext, String decision) {
     getState(jobContext).put(SCALING_HISTORY_KEY, decision);
   }

   @Override
   public String getScalingHistory(KubernetesJobAutoScalerContext jobContext) {
    getState(jobContext).get(SCALING_HISTORY_KEY);
   }

   @Override
   public void removeScalingHistory(KubernetesJobAutoScalerContext jobContext) {
      getState(jobContext).remove(SCALING_HISTORY_KEY);
   }

   private Map<String, String> getState(KubernetesJobAutoScalerContext jobContext) {
       return cache.computeIfAbsent(jobContext.getJobKey(), jobKey -> getOrCreateConfigMap(jobContext)).getData();
   }

   @Override
   public void flush(KubernetesJobAutoScalerContext jobContext) {
       ConfigMap configMap = cache.get(jobContext.getJobKey());
       Preconditions.checkState(configMap != null, "The configMap shouldn't be null.");
       try {
           cache.put(jobContext.getJobKey(), kubernetesClient.resource(configMap).update());
       } catch (Exception e) {
           LOG.error(
                   "Error while updating autoscaler info configmap, invalidating to clear the cache",
                   e);
           removeInfoFromCache(jobContext);
           throw e;
       }
   }

   @Override
   public void removeInfoFromCache(KubernetesJobAutoScalerContext jobContext) {
       cache.remove(jobContext.getJobKey());
   }

   private ConfigMap getOrCreateConfigMap(KubernetesJobAutoScalerContext jobContext) {
       AbstractFlinkResource<?, ?> cr = jobContext.getResource();
       var meta = createCmObjectMeta(ResourceID.fromResource(cr));
       return getScalingInfoConfigMap(meta).orElseGet(() -> createConfigMap(cr, meta));
   }

   private ConfigMap createConfigMap(HasMetadata cr, ObjectMeta meta) {
        this.kubernetesClient = kubernetesClient;
        this.configMap = configMap;
    }

LOG.info("Creating scaling info config map");
    @Override
   var publiccm Optional<String>= get(String key) {new ConfigMap();
        return Optional.ofNullable(configMap.getData().get(key))cm.setMetadata(meta);
    }

    @Override
cm.addOwnerReference(cr);
     public void putcm.setData(String key, String value) {new HashMap<>());
       return configMapkubernetesClient.getDataresource(cm).put(key, valuecreate();
    }

   private @Override
    public void remove(String keyObjectMeta createCmObjectMeta(ResourceID uid) {
        configMap.getData().remove(keyvar objectMeta = new ObjectMeta();
    }

    @Override
objectMeta.setName("autoscaler-" + uid.getName());
     public void flush() { uid.getNamespace().ifPresent(objectMeta::setNamespace);
       objectMeta.setLabels(
 try {
            configMap = kubernetesClientMap.resource(configMap).update();
 of(
       } catch (Exception e) {
            LOG.error(
   Constants.LABEL_COMPONENT_KEY,
                 "Error while updating autoscaler info configmap, invalidating to clear the cache" LABEL_COMPONENT_AUTOSCALER,
                    e);
   Constants.LABEL_APP_KEY,
         configMap = null;
            throw euid.getName()));
       return }objectMeta;
    }

   private @Override
Optional<ConfigMap>    public boolean isValid(getScalingInfoConfigMap(ObjectMeta objectMeta) {
        return configMap != null;
    }
}

POC

I have finished the POC for FLIP-334, here is the POC branch. This branch has 3 commits:

  • The first commit: Create the flink-autoscaler module, and move non-kubernetes related autoscaler classes to flink-autoscaler module.
  • The second commit: Add the general interface for autoscaler.
  • The third commit: Remove some test classes of autoscaler due to they depend on k8s, I didn't support the unit test in this POC.(Don't worry, for the final PR, I will support all tests)
  • The last commit: Decoupling the autosclaer and kubernetes.

The flink-autoscaler module doesn't depend on any kubernetes dependencies in this POC. I have tested, it works well with kubernetes.
You can run the flink-kubernetes-operator locally based on the flink-kubernetes-operator-docs and run the autoscaling example.

...

return Optional.ofNullable(
               kubernetesClient
                       .configMaps()
                       .inNamespace(objectMeta.getNamespace())
                       .withName(objectMeta.getName())
                       .get());
   }
}


Compatibility, Deprecation, and Migration Plan

...