Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Page properties

StateAbandoned
Discussion Thread
JIRA


Github PRs


Created

Created

Problem Statement

Carl Sagan once said “to truly create an apple pie from scratch, one must first create the universe.” Unfortunately, the KubernetesExecutor (in its current form) appears to do this for every task. When compared to the issues of over/under subscription, the KubernetesExecutor’s dynamic allocation offers pretty powerful opportunities to speed up workflows. However, this has come at the cost of having to start a pod, a python interpreter, AND a full instance of airflow every time we wish to run a task. This proposal looks to take advantage of the kubernetes Knative library to create a “pre-heated” Airflow experience.



Why a Knative Executor

Celery Scalability With Kubernetes Autoscaling

It’s no secret that the KubernetesExecutor does not currently have the same scalability as the CeleryExecutor. Needing to start an entire airflow instance for each individual task can get quite expensive, and the number of pods created can put a lot of pressure on your kubernetes. However, this architecture was built to solve a real problem in the CeleryExecutor: static allocation. The CeleryExecutor is able to manage multi-tenancy within each of its workers, but the cost is that you have to pre-define the size of the cluster leading to either wasted resources, or lost SLAs.
What we are proposing here is a hybrid system. A system that can scale down to 0 when not needed, but can also handle an arbitrary number or airflow task requests in a scalable and multi-processed manner.
Simplifying the airflow k8s experience (for most people)
Currently, when an airflow user wants to attach service accounts or specify resources to a task, they have to specify all of those details in that task, But 99% of tasks can probably fit within 4-5 different pod “templates.” By creating “queues” via different knative endpoints, we can essentially mimic the familiar CeleryExecutor DAGs for the average data engineer. Moving task details out of the airflow DAG has an added security benefit as they will not be available in python files.
This means that the “templates” can be managed by the Devops/platform engineers, and the data engineers will no longer need to worry about the internals of their airflow pods.  

Improved story around k8s pod operator

One consistent complaint that users of the k8s executor have mentioned is that they don’t like that launching a pod via the k8sexecutor using the k8spodoperator launches first an airflow pod due to scheduling THEN a second pod due to their pod request. This was initially to avoid complicating the logic of the executor, but with a knative-based solution, these pod requests can be handled via a shared knative airflow pod, thereby significantly reducing the footprint of these tasks.

What is Knative?

Imagine knative as AWS Lambda functions for kubernetes. Users can submit function parameters in the form of http requests or messages via a broker (e.g. kafka) and knative will perform those functions. Knative can then scale based on load/quotas, perform all operations needed, and then dynamically terminate when completed. Where knative shines is that it only has the “pod launching” slow-down when the container first starts, and can then keep the airflow/python interpreter environment running for future tasks.

External Dependencies

Kubernetes

Knative ONLY works on kubernetes and makes many kubernetes-based assumptions. 

Istio

Istio is a envoy-based service-mesh that is quickly becoming a default part of any production kubernetes stack that has an interest in properly securing service-endpoints. However, istio can have a fairly high learning curve and is a fairly heavyweight technology. In previous knative releases, istio was a requirement. However, as of version 0.5 knative now only requires IngressGateways, which can be offered in the much lighter-weight glooGloo

Gloo is a kubernetes ingress gateway that is a sub-system of istio. This is an ideal solution for users who do not want an entire istio deployment on their k8s clusters.

Potential Benefits of Knative Executor

Easier Scaling to user SLAs

Knative offers load-based (# of requests), CPU-based, and memory-based scaling options. Users can also specify minimum and maximum deployment sizes. This means that users can set workers to take any arbitrary number of tasks before scaling up or down the number of needed workers. Then, once all of the tasks are completed, it can shrink down to 0 pods or to a set minimum if a user needs workers ready at all times.

Knative-based Queueing

One of the more popular features of the celery executor is the ability to assign tasks to different queues. You can maintain a number of workers for high-priority SLAs, tasks requiring access to sensitive data, etc. With Knative-based deployments, we could deploy a different airflow environment per “queue”. We could then use those queues to offer different node affinities (e.g. nodes with extra memory) or environment variables (e.g. secrets to access other services).

This would be a great alternative to the current “define environments on a per-task basis”. Devops engineers would have the full flexibility of the k8s API to create exactly the workers they need, and data engineers would only need to point to a queue rather than specifying pod specs in their DAG code.

Installing Knative

Step 1: Install Kind

Kubernetes In Docker is a popular new system for local testing kubernetes clusters. It does not require a VM like minikube, and is able to mimic the networking elements of seperated Kubernetes nodes. Kubernetes itself and Airflow both use KinD for our CI/CD pipelines.

Step 2: Install Istio

In the Knative documentation, you can find instructions for downloading istio for knative. Unless you plan on using the full service mesh, we recommend you start with the istio-lean, which will not inject istio sidecars into your pods, but will offer the necessary ingress-gateway component.
At this point, if you plan on running your airflow scheduler inside of your Kubernetes Cluster, we recommend you create a local gateway for your istio. By default, istio will open your knative services to the external ingress of your kubernetes cluster. The local-gateway allows you to create services that are available only to other services within your cluster.

Step 3: Install Knative Serving

Now that you’ve set up your istio gateway, you can download your knative serving deployments. Notice that for this case since we are working with KinD we are replacing any loadbalancers with NodePorts. This is to prevent any services from remaining pending while waiting for LoadBalancers.

Architecture

Step 1: Launch

Image Removed

Before launching airflow, users can submit any number of knative service configuration to the kubernetes API. These can be stored in yaml files, and we can offer some basic examples. Once these deployments are registered, users can store the endpoint names in the airflow.cfg. These configurations will not launch any pods, but will simply inform knative what a pod should look like when it needs to autoscale to match load. 

An example “queue” would look like this:

Code Block
languageyml
titlebasic knative queue
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
name: autoscale-airflow
namespace: default
spec:
template:
  metadata:
    annotations:
      # Target 10 in-flight-requests per pod.
      autoscaling.knative.dev/target: "10"
  spec:
    containers:
    - image: dev.local/airflow:latest
      imagePullPolicy: Never
      command: ["airflow", "airflow_worker"]
      env:
      - name: SQL_ALCHEMY_CONN
        valueFrom:
          secretKeyRef:
            name: airflow-secrets
            key: sql_alchemy_conn
      volumeMounts:
      - name: airflow-configmap
        mountPath: /root/airflow/airflow.cfg
        subPath: airflow.cfg
    volumes:
    - name: airflow-configmap
      configMap:
        name: airflow-configmap

Users would set a “default” queue in their airflow.cfg under the “knative” section. If they want to customize and have multiple queues, they can use the executor_config variable of their task to point tasks to different queues.

Worth noting that it’s important to clarify in this step if you plan to use this service within the kubernetes cluster or externally.You can do this by setting the following label

Code Block
languageyml
serving.knative.dev/visibility=cluster-local

Step 2: Communicate

Once the system is set-up, airflow will be able to communicate with knative using HTTP. If your airflow is within the k8s cluster, it can speak to the service name directly using istio routing. If, however, you wish to keep your airflow instance outside of kubernetes, you will need to hit the istio ingress-gateway port and set the host header to the name of the service.

The Airflow Worker

Since knative runs on the concept of a long-running service, this would be a time to potentially separate the work of airflow operators from the scheduling element of a scheduler. We could accomplish this by creating a simple “airflow worker” server using asyncio.

Image Removed

Local Development Story

One VERY cool aspect of this new architecture is that it actually creates a highly performant alternative for the LocalExecutor. For users who do not require certain specific features like RunAsUser, running a local airflow server has shown as much as a 20X speed increase in launching and maintaining tasks. This is because the current LocalExecutor requires that every task start a brand new process, reloading airflow and all DAGs in the process. 

Offering With The KubernetesExecutor

One current downside of using knative as a task-runner is that Knative has limits when it comes to running very long tasks. For any cloud-run based system there is a hard limit of 30 minutes, and on any custom system we start running into strange behavior if we hold a connection for too long (we’re currently testing to see how long we can reasonably do). For this reason, we will offer a “kubernetes” queue that will use the traditional KubernetesExecutor to launch pods that eitehr require isolation, will run for multiple hours, or require custom settings.

Would this replace the Kubernetes Executor?

Well… yes and no. For many reasons this new architecture would improve upon the existing k8sexecutor. However, many more basic users/users with managed k8s clusters might not be able to install knative. For these users, the classic executor will still offer many of the benefits of a k8s native experience.
Another case where users might want to continue using the “classic” kubernetes executor would be for very long running tasks. Knative serving is not meant to run hour/multi-hour tasks as this messes with its ability to perform autoscaling (e.g. Imagine you start 10 workers that take 10 tasks each. Each of those workers receive one multi-hour job. Even though 90% of tasks have completed, knative is unable to reduce the cluster size as each worker has a single long-running task).In favor of other scaling architectures we have decided to abandon the KnativeExecutor