Status
Motivation
This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achite that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API.
Airflow Database APi is a new independent component of Airflow. It allows to isolate some components (like Worker/DagProcessor) from direct access to DB.
Note: Access control for Airflow Database API is out of scope for this AIP, but it was kept in mind, making the implementation feasible.
Proposal
Airflow Database API is a new API, independent of existing Rest API embedded into Airflow Webserver:
- They have different purposes: user-faced vs internal
- They have different access model - end user credentials(roles) vs row-level access with tokens
Airflow Database API must provide methods only for "untrusted" components (worker/dag processor/triggerer), as Scheduler will use only DBDirect mode.
API
Airflow Database API is built using Connexion, which allows the creation of Rest API from Open API specification.
The new components can be started using a new command line command: airflow db-api.
Code is located in /airflow/db/api/, with all endpoint's methods in /airflow/db/api/endpoints.
Query
DBClient class
To avoid copying the SQLAlchemy code to the Airflow Database API and maintaining two versions of it, I propose to create a dedicated class that will contain all methods required by untrusted components that will either run SQLAlchemy query or make a call to Airflow Database API. The class is located in /airflow/db/api/client.
Airflow Database API itself will call the same method from the DBClient class, but as it runs in DBDirect mode it runs the SQLAlchemy. With that approach there is always a single place to add new code shared between DBIsolation and DBDirect mode.
Example:
For code
session.query(DagRun) .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date) .first()
defined as:It is replaced by the
db_client.get_dag_run(dag_id, execution_date)
class DBClient @provide_session() def get_dag_run(self, dag_id, execution_date, session=None): If self.is_db_isolation: return self._get_dag_run_from_api(dag_id, execution_date) else: return session.query(DagRun) .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date) .first()
With following method in Airflow Database API:
class AirflowDatabaseAPI def get_dag_run(self, dag_id, execution_date): return self.db_client.get_dag_run(dag_id,execution_date)
The method may be extended, accepting more optional parameters to avoid having too many similar implementations.
There is no list of all methods that should be added - they will be gradually added during implementation.
Configuration
Together with Airflow Database API the following configuration setting are introduced:
[core] database_access_isolation
Whether the DBIsolation mode is enabled and untrusted components should use Airflow Database API for accessing Airflow Database
[core] database_api_host
Url for Airflow Database API. Used only if [core]database_access_isolation is True
Dag processor, worker and triggerer
All tests for DagProcessor, Workers (including Operators) and triggers are executed in both modes "DBDirect'' and "DBIsolation ''. The code for Airflow Database API, the DbApiClient and DBSession are extended until all tests pass.
Why is it needed?
This change allows Workers, DagProcessors and Trigerrer to work in untrusted mode - without direct access to Database.
Which users are affected by the change?
Only those that set [core]database_access_isolation=True
How are users affected by the change? (e.g. DB upgrade required?)
What defines this AIP as "done"?
It is possible to start an Airflow Database APi from Airflow CLI and it is easily extensible with new methods. Workers, Dag Processors and Trigerrer can work in DbIsolation mode - without direct access to DB.