Status
Problem Statement
Carl Sagan once said “to truly create an apple pie from scratch, one must first create the universe.” Unfortunately, the k8sexecutor (in its current form) appears to do this for every task. When compared to the issues of over/under subscription, the k8sexecutor’s dynamic allocation offers pretty powerful opportunities to speed up workflows. However, this has come at the cost of having to start a pod, a python interpreter, AND a full instance of airflow every time we wish to run a task. This proposal looks to take advantage of the kubernetes Knative library to create a “pre-heated” Airflow experience.
Why a Knative Executor
Celery Scalability With Kubernetes Autoscaling
It’s no secret that the KubernetesExecutor does not currently have the same scalability as the CeleryExecutor. Needing to start an entire airflow instance for each individual task can get quite expensive, and the number of pods created can put a lot of pressure on your kubernetes. However, this architecture was built to solve a real problem in the CeleryExecutor: static allocation. The CeleryExecutor is able to manage multi-tenancy within each of its workers, but the cost is that you have to pre-define the size of the cluster leading to either wasted resources, or lost SLAs.
What we are proposing here is a hybrid system. A system that can scale down to 0 when not needed, but can also handle an arbitrary number or airflow task requests in a scalable and multi-processed manner.
Simplifying the airflow k8s experience (for most people)
Currently, when an airflow user wants to attach service accounts or specify resources to a task, they have to specify all of those details in that task, But 99% of tasks can probably fit within 4-5 different pod “templates.” By creating “queues” via different knative endpoints, we can essentially mimic the familiar CeleryExecutor DAGs for the average data engineer.
This means that the “templates” can be managed by the Devops/platform engineers, and the data engineers will no longer need to worry about the internals of their airflow pods.
Improved story around k8s pod operator
One consistent complaint that users of the k8s executor have mentioned is that they don’t like that launching a pod via the k8sexecutor using the k8spodoperator launching first an airflow pod due to scheduling THEN a second pod due to their pod request. This was initially to avoid complicating the logic of the executor, but with a knative-based solution, these pod requests can be handled via a shared knative airflow pod, thereby significantly reducing the footprint of these tasks.
What is Knative?
Imagine knative as AWS Lambda functions for kubernetes. Users can submit function parameters in the form of http requests or messages via a broker (e.g. kafka) and knative will perform those functions. Knative can then scale based on load/quotas, perform all operations needed, and then dynamically terminate when completed. Where knative shines is that it only has the “pod launching” slow-down when the container first starts, and can then keep the airflow/python interpreter environment running for future tasks.
External Dependencies
Kubernetes
Knative ONLY works on kubernetes and makes many kubernetes-based assumptions.
Istio
Istio is a envoy-based service-mesh that is quickly becoming a default part of any production kubernetes stack that has an interest in properly securing service-endpoints. However, istio can have a fairly high learning curve and is a fairly heavyweight technology. In previous knative releases, istio was a requirement. However, as of version 0.5 knative now only requires IngressGateways, which can be offered in the much lighter-weight gloo.
Gloo
Gloo is a kubernetes ingress gateway that is a sub-system of istio. This is an ideal solution for users who do not want an entire istio deployment on their k8s clusters.
Installing Knative
Knative installation is fairly straightforward. It can be uploaded to any k8s cluster via a series of CRDs. That said, it is not free. This will involve some further involvement from the devops/platform team to maintain. That said, knative is a multi-purpose tool so there are plenty of use-cases that teams can consider with this technology.
Potential Benefits of Knative Executor
Easier Scaling to user SLAs
Knative offers load-based (# of requests), CPU-based, and memory-based scaling options. Users can also specify minimum and maximum deployment sizes. This means that users can set workers to take any arbitrary number of tasks before scaling up or down the number of needed workers. Then, once all of the tasks are completed, it can shrink down to 0 pods or to a set minimum if a user needs workers ready at all times.
Knative-based Queueing
One of the more popular features of the celery executor is the ability to assign tasks to different queues. You can maintain a number of workers for high-priority SLAs, tasks requiring access to sensitive data, etc. With Knative-based deployments, we could deploy a different airflow environment per “queue”. We could then use those queues to offer different node affinities (e.g. nodes with extra memory) or environment variables (e.g. secrets to access other services).
This would be a great alternative to the current “define environments on a per-task basis”. Devops engineers would have the full flexibility of the k8s API to create exactly the workers they need, and data engineers would only need to point to a queue rather than specifying pod specs in their DAG code.
Potential Architecture
Step 1: Launch
Before launching airflow, users can submit any number of knative service configuration to the kubernetes API. These can be stored in yaml files, and we can offer some basic examples. Once these deployments are registered, users can store the endpoint names in the airflow.cfg. These configurations will not launch any pods, but will simply inform knative what a pod should look like when it needs to autoscale to match load.
When a user launches airflow, they simply need to include a “knative_queue” variable in their executor_config for each task and the tasks will be routed correctly.
Step 2: Communicate
Knative offers three communication options: HTTP, gRPC, and via a message broker. Both message queues and http have plusses and minuses that should be discussed by the community (http has security implications unless users are able to establish mTLS, message brokers offer more points of failure for the architecture). Using http, we can simply establish a different service address for each queue.
The Airflow Worker
Since knative runs on the concept of a long-running service, this would be a time to potentially separate the work of airflow operators from the scheduling element of a scheduler. We could accomplish this by creating a simple “airflow worker” server using asyncio.
When the knative launches a worker, it would run an airflow worker command that would launch a persistent server. This asyncio server would have access to all airflow libraries and save time on needing to start a python interpreter when it receives a request. Each request would spawn a new process that would run a single task (similar to the LocalExecutor but without the need for a heartbeat). These servers could then be load balanced by knative (via load/memory/CPU limits set by the user).
Would this replace the Kubernetes Executor?
Well… yes and no. For many reasons this new architecture would improve upon the existing k8sexecutor. However, many more basic users/users with managed k8s clusters might not be able to install knative. For these users, the classic executor will still offer many of the benefits of a k8s native experience.
Another case where users might want to continue using the “classic” kubernetes executor would be for very long running tasks. Knative serving is not meant to run hour/multi-hour tasks as this messes with its ability to perform autoscaling (e.g. Imagine you start 10 workers that take 10 tasks each. Each of those workers recieve one multi-hour job. Even though 90% of tasks have completed, knative is unable to reduce the cluster size as each worker has a single long-running task).