This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achite that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API.

Airflow Database APi is a new independent component of Airflow. It allows isolating some components (Worker, DagProcessor and Triggerer) from direct access to DB.

Note: Access control to Airflow Database API & Airflow Database rows is out of scope for this AIP, but it was kept in mind, making the implementation feasible.


Internal API architecture

Airflow Internal API is logically a new API, independent of existing User-facing API embedded into Airflow Webserver:

  • They have different purposes: user-faced vs internal
  • They have different access model - end user credentials(roles) vs task-based access
  • They are different styles of APIS: REST vs. RPC-like (HTTP Based) 
  • The REST API is user facing and it is API-first (code reflects the API specification) where in case of Internal API the API should be code-first (the API reflects methods to be called)

The way how we approach implementation is still to be decided at the POC stage but we want to seamlessly integrate authentication between the two APIs. It should be possible to authenticate with both APIs using the same mechanism, and it should be possible to expose both APIs by the same web server .

The REST API and Internal API could be exposed together in a single process (webserver) but optionally they could be exposed separately in different processes. The Internal/REST API  separation can be introduced in two ways:

  • Internal APIs exposed together with REST API via Webserver with RBAC access control
    • We will have a separate “Internal API Role”, and “Internal API user” within the access model  that we use from FAB. You can only authenticate with the user, using the “Internal” authentication method: based on tokens 
    • The authorization of Workers should be based on validity of JWT tokens  generated  when Scheduler.  Dag Processor and Triggerer will have a secret that will be available only to them, not to the user code they run (see below the section on “Securing  access to authorization token in Dag Processor and Triggerer).The user code in the  Worker should not have access to any long term secret - they should only use limited-time valid JWT Token. 
    • This “Internal API Role” will have only permissions that will allow it to execute the Internal APIs (RPC-styled) that are needed to provide user-code with current functionality (Connections/Variables etc.) in a transparent way. Selected (possibly even all) other API methods from existing REST API that should be exposed to the Workers, Dag Processor and Triggerer. We will make an inventory and make a decision on  a “default” permissions for REST APIs during the implementation. This will be done using the same RBAC controls that we use in current REST API implementation.
  •   Physical separation of the components exposing both APIs:
    • The “Internal API” optionally can also be exposed by a component separate from the current webserver.
    • We also propose to implement a separate, optional “airflow internal-api” component, that could expose only the APIs needed by Workers, DAGProcessor and Triggerer. This component can be run in a separate “security zone”, allowing to separate access to the “Internal API” on a physical/networking level 

The Internal API calls are “RPC”-style APIs rather than CRUD APIS so they do not map into the REST  semantics. The Internal endpoints do not need to be published in the API specification so we plan to generate the APIs based on the code (decorated functions). As the next step (implementing POC) we will determine if we should generate the endpoints from the code via completely custom code, or whether we will be able to leverage existing libraries and tools (for example Fast API).. 

Later phase of “Umbrella” AIP-1 implementation (not implemented in this AIP) is to implement task-based authorization (each task should have access to a subset of operations depending on dag_id, task id, origin, etc.). While this AIP does not address it, it makes it possible. 

In the future, the JWT token can be used to carry additional (signed) payload which will be used to selectively authorize the caller to specific entities (connections, variables, specific tasks or dags only etc.)

No access to Metadata DB for “User code” in DagProcessor and Triggere

Currently, SQLAlchemy Models of Airflow are available also for the top-level user code from DAGs and callbacks. Essentially users can access Airflow Variables, Connections and other DB models while the DAG code is parsed or when callbacks are executed. This is however considered a bad practice for Top Level code, also using DB access for callbacks (TODO discuss it) is not recommended practice. It interferes with the parsing process and it can be replaced by other mechanisms. 

Similarly any DB acces in User Code in Triggerer is a very bad practice, because the database of Airflow is not available for operations. SQLAlchemy queries are Beta functionality and only work with the asynchronous Postgres, experimental driver, so for all practical purposes any Trigger user-provided code should not access the metadata DB.

The proposal for the DB Isolation mode is that the Top-Level Python code and callbacks executed by the DAGProcessor are not allowed to access Airflow Database at all. 

This could be achieved by not exposing an SQLAlchemy session to the process that executes DAG Parsing and callbacks. While this does not provide complete isolation for malicious users (DAG Parsing/Callbacks will be executed as sub-processes of DAGProcessor and full sandboxing of the parsing process is out of the scope for now), this could be a handy mechanism to improve performance and enforce best practices for DAG Top-Level Process parsing and callbacks.

This also means that all the Community operators should not  use any DB operations in their constructors. This is similarly considered as bad practice, and enforcing that should be great side effect of introducing the DB Isolation mode. As part of the implementation, all such operators that are currently using DB operations will be identified and fixed.

Securing access to authorization token in DagProcessor and Triggerer

Another (security related) consequence of “no-access” for the user code by DagProcessor and Triggerer is that the user code run by DagProcessor and Triggerer should have no access to the authentication information to Internal API. Both components will have to have a long-living token that will allow them to authenticate and later stage authorize to run selected internal API methods, and this token should not be made available to user code. By default - in both cases - executing of the user code happens in a separate process and the token saved in memory might be removed while forking or not passed when the processes are spawned, however the token (or at least token that allows the DagProcessor and Triggerer to retrieve the actual token to use)  must be stored locally on the server where DagProcessor and Triggerer are run - to handle cases like restarts without manual intervention. That puts special requirements on the parsing and event loop processes started by DagProcessor and Triggerer respectively. The following options should be implemented (TODO: discuss this):

  • The configuration file where the token (or configuration of long-living token  to retrieve it) should not be “world readable” and it should be readable only by the user running DagProcessor/Trigger. In the most “strict” mode, we can perform the check at the startup and refuse Airflow to start if the configuration file has wrong permissions
  • Parsing/Event loop processes might be started as “no-user” processes . The consequence of this is that DagProcessor and Triggerer should be run in impersonation mode, where the user running Airflow should have “sudo” capability. This provides pretty good isolation, however requires careful configuration of the deployment in order to fully secure the authentication/authorization information.
  • [Optional - not sure if we need it a all] The Parsing/Even loop processes might be run as containers where the configuration of Airflow will not be mapped into the container. That provides greater isolation and security level and should only be considered in certain deployments where Docker engine is available. This is not part of the AIP, it could be implemented as a follow-up AIP
  • [Optional - not sure if we need it all]  we could consider K8S deployment where the processes parsing DAG files are executed in dedicated node pools. This is not part of the AIP and could be implemented as follow-up AIP

Test harness

As part of the implementation, we want to implement a test harness tapping into the existing CI/test environment of Apache Airflow, where the above assumptions can be automatically verified for all the community provided code:

We will run all CI Test Suite in “DB Isolation Mode” - where the unit tests will get a modified SQLAlchemy Session that will catch incorrect uses of the Database:

  • Using disallowed models in the Community Operators/HooksUsing disallowed DB operations in DagProcessor
  • Using any DB access in any of the example DAG’s top-level code
  • Using any DB Access in __init__() methods of any of the Community Operators
  • Using disallowed models in the Community Hooks
  • Using disallowed DB operations in "execute()" calls of the operators

Internal API inventory

The Internal API serves three components: Dag Processor, Workers, Triggerer. This table shows an inventory of impacted methods, their applicability and our approach for the methods.

Note that Triggerer, asynchronous, user provided code, almost by definition should not perform any DB calls synchronously (and this is the only possibility to access Airflow Code) . This is considered as bad practice and we believe not allowing Airflow DB access (and not providing the equivalent API)  from the user provided code by the triggerer might improve the performance and characteristics of Triggerer. Any attempt to use an SQLAlchemy/ direct DB access by the user-provided code should raise an exception - to be verified during POC.

User code method inventory

Those methods are available to User code that runs in Workers as part of task execution. We can easily utilize existing REST API to retrieve those. Additionally users will have access to pre-configured Python Client (installed as Airflow Dependency) that will enable Operator developers and DAG writers to access all the exposed REST APIs of airflow in a straightforward way. Note that users will also have to access config which is not a DB access, however access to Config should also be done via REST API - this allows to secure access to Secret Backends - no user code will have access to the secret which is used to authenticate with Secret Backends.


Proposed approach 


Only READ-ONLY BaseHook.get_connection() will be implemented to transparently read Connection from Database or from Secrets. Secret access will also be done using the Internal API Server. Currently the connection performs a database roundtrip from all components. Replacing it with REST  API call will make it slightly slower but it will also leverage SQLAlchemy Connection Pool by the Internal API Server. Connection retrieval is infrequent operation in general


READ-WRITE Variable.get/set/update/clear  methods will be implemented transparently using APIs. Similar performance consideration as in case of Connection.


READ-WRITE all of the operation (get*/clear*) will be implemented to transparently use Internal API, Custom XCom Backend access should be done directly from the “calling entity” however and we should provide a mechanism of passing temporary credentials from the XCom Backends so that the workers can store/retrieve XCom data directly without going through the API server. This can be implemented for example using pre-signed URLS or other mechanisms where temporary credentials to access the data are passed from the server to the worker.


The tasks running in workers will also have access to the Airflow Python Client that will use the “Task” JWT token for authentication and be configured to talk to the Airflow Webserver automatically. This will provide easy access to the functionality exposed by Airflow via API to Operator developers and DAG writers, who will have standardized access to Airflow via the client.

Non-DB code that should also be isolated (because of Secret Backends)


READ-ONLY access to retrieve configuration Secret Backend access will only be done on the Internal API server. Option: Until selective authorization is implemented we might allow direct access from Workers/DagProcessor for scalability reasons even if full security might not be achievable. In this case secret credentials might leak to user code.  Also, in most “strict” deployment (see above discussion about the access of user code to authentication token), the configuration file should not be made available to the user code directly. 

Internal API calls

Those methods are not necessarily going to be mapped 1-1 into the actual API calls. It is possible that several methods might be joined together for efficiency (with some possible duplications resulting if methods are used in several places). It is important however that each of the methods constitutes a complete transaction (or several transactions), because going through APIs will make it extremely difficult to maintain transactional integrity across multiple methods.

Dag Processor Airflow Code


Proposed approach 


This method should be exposed as a separate Internal API call (in DB isolation mode). It is essentially a single, independent  transaction which also covers sending notifications (note - in case we implement custom notification engines, the code of those will be executed in the context of the Internal API server. .


This is also a single-session method that can be exposed and executed fully by the Internal API server.


This method should stay within the DAGProcessor  but it should use Internal API to retrieve appropriate models (which will be API objects rather than SQLAlchemy objects): DagRun, TaskInstances, Template_context - via current Public API (extensions needed). We might need to optimize/update some of the APIs to get missing information and achieve best performance by batching retrieval of only those objects that are needed.


This should be a separate, new Internal API method call that should perform a single transaction. Note that currently commit is not performed for that method, but it should  have no adverse effect to make this separate transaction.


This should be a separate, new internal API method call that should perform a single transaction. Similarly as above - currently this is done in the same transaction but, there should be no adverse effects of making this separate transaction


This is a new, “heaviest” method that should be exposed by the internal API to the DAG processor. We should be able to serialize all the information stored currently in the DagBag and send it to the Internal API server. The method should be roughly equivalent to: sync_to_db and pickling dags if set. This should be done in a single transaction as a single API call.

Triggerer Airflow Code


Proposed approach 

Trigger Model

Similarly as in case of the User-Code, the Trigger class should be updated to transparently handle calls to it: bulk_fetch, clean_used, assign_unassigned, submit_event, submit_failure, ids_for_triggerer. All those methods are currently single transaction calls and can be easily mapped into API calls with serialized information. Performance wise it exchanges HTTP REST calls with a DB call which yields some unavoidable performance penalty. However only submit_failure and sumbit_event will have visible impact due to their “non-batch” nature. In case this constitutes a problem we might consider a mechanism where we also accumulate and batch those events into batch calls.

Worker Airflow Code


Proposed approach 


Single transaction retrieving data from the DB - part of internal API..


There are a few places where tasks are saved to the db in a separate transaction after task values are updated. (host/job_id). We will implement a new API endpoint for it.


We should implement an internal API call that would create a trigger entry and update the task state. This method is a single-transaction method so it should have limited performance impact - Internal API can utilise connection pooling much better than Workers.


We should implement a separate Internal API call that retrieves context for the task. All fields in context should be serializable (or we can make them so) and performance of this method should not be heavily impacted as we exchange a DB call with an API call from the point of view of Worker and Internal API implementation can utilize connection pooling much better.


We should implement a separate, Internal API call that implements adding task reschedule via DB. This is a single transaction so it should be easy to replace it with a single API call.


Similarly as above, we should implement a single API call as this method is always followed by a commit.


The message should add a log model entry as a separate call. Similar considerations as above.


Updates task instance state. Also separate methods od in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode. - similar considerations. One watchout as there is an option to execute callbacks while running this function. Rather than executing them in this case we should return a list of callbacks to execute and execute them in the context of the worker. They will be executed after the transaction is complete and there is a small risk that callbacks would be lost in this case, however the risk is rather minimale. Option: We could consider options to store the callbacks in the database instead and delegate execution of those to DagProcessor. 


This should be a separate method in the API. There is rather complex logic in the method and it requires to load a partial subset of the tasks which might be costly. If we find that this method has performance limitations we can disable mini-scheduling in DB isolation mode.

Implementation notes: Internal API component

Airflow Internal API provides methods only for "untrusted" components (Worker/Dag Processor/Triggerer), as Scheduler will use only direct DB access.

While the APIs could be exposed also via webserver, we propose to add a new, optional, stateless (thus horizontally scalable) component that can be started using a new command line command: airflow internal-api.

Code is located in airflow.api_internal package, with all endpoint's methods available via in /api_internal/ endpoint. 

As mentioned above the “Internal API” is not an “API-first” but “Code first”. The implementation details on how we map particular methods into endpoints is going to be worked out in detail at the POC that follows the AIP, but we attempt to approach the implementation in the way that will minimize impact on the existing code (at most slight refactoring and splitting existing methods into several or joining them). All the methods exposed via API will have to be adapted to make their parameters and return values serializable (so in some cases we will have to perform conversion between the objects that are big/not serializable into appropriate representation needed by the method (for example DagBag passed in a few methods above and example below should be turned in list of ids and parameters needed by the methods). All the methods involved will have to be converted into “functions” so that no instance objects are accessed.

Another important factor is future maintainability - in the implementation we will avoid code duplication by implementing decorators that will either use the locally implemented DB access or delegated automatically to a corresponding API call. The current “@provide_session” decorator will provide an “Assert” session in Workers or DagProcesor in the DB Isolation mode, so any attempt of accessing the DB from those will raise an exception.

What we want to achieve is to automate both client and server code in the way similar to Flask approach - decorating any of the methods with @api_internal decorator will turn a method into API call when run in a “DB isolation” mode (in DagProcessor and Worker) - including validation of serializability of parameters and return values and type of method. This will allow to easily apply the same approach to future features of Airflow.

Below you might find an example of the implementation. The code below is a fragment of current DagProcessor’s main “process_file”. You can see all the changes needed:

  • Slight refactorings necessary (including parameter conversion)
  • Converting method into function
  • Explicit transaction boundaries corresponding to remote methods
  • Decorator delegating method execution on the client and implementing endpoint in the server


Code Block
titleBefore chage
    def process_file(
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
        session: Session = None,
) -> Tuple[int, int]:"Processing file %s for tasks to queue", file_path)

            dagbag = DagBag(file_path, include_examples=False, \
        except Exception:
            self.log.exception("Failed at reloading the DAG file %s", \
            Stats.incr('dag_file_refresh_error', 1, 1)
            return 0, 0

        self._deactivate_missing_dags(session, dagbag, file_path)

    def _deactivate_missing_dags(self, session, dagbag, file_path):

Might become:

Code Block
titleAfter Change
def _deactivate_missing_dags(session, dagbag_extract, file_path):
    # 1. Accessing dagbag_extract needs to be refactored to
    #    access extracted version of DagBag
    # 2. The commit() is added here to explicitly end the transaction
    #    at the remote method boundary. It is harmless to add it here
    #    as there is no transactional integrity required
    #    with the DB operations that follow it.
    # 3. `self` is only used for logging we can substitute it with
    #    static logger
    # 4. In the client with DB ISolation, the method is not
    #    Executed - API call is executed instead and server executes
    #    the very same method instead. 


    def process_file(
        file_path: str,
        callback_requests: List[CallbackRequest],
        pickle_dags: bool = False,
        session: Session = None,
) -> Tuple[int, int]:"Processing file %s for tasks to queue", file_path)

            dagbag = DagBag(file_path, include_examples=False, \
        except Exception:
            self.log.exception("Failed at reloading the DAG file %s", \
            Stats.incr('dag_file_refresh_error', 1, 1)
            return 0, 0

        dagbag_extract = dagbag.extract()
        _deactivate_missing_dags(session, dagbag_extract, file_path)

Deployment options

Option 1: No isolation (today):

[core]database_access_isolation=False (default)

All components have direct access to the Airflow database as they do right now. No change/impact on Airflow execution. Diagram
diagramNameCopy of Diagram bez tytułu

Option 2: Internal API as part of Web server

This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as passing additional parameter --with-internal-api to the airflow webserver command.

In this mode Internal API is executed as part of Airflow API embedded into the web server, which allows saving resources in small installations. Diagram
diagramNameDiagram bez tytułu

Option 3: Internal API as external component

This mode requires setting [core]database_access_isolation=True and [core] database_api_host=<db_api_url> as well as starting additional component with `airflow internal-api`

In this mode Internal API is executed as a standalone component, using the same code as Web Server API but exposing only parts of the endpoints (Internal-API related). This mode allows scaling independently (e.g. on Kubernetes using HPA) Diagram
diagramNameCopy 2 of Diagram bez tytułu

titleAlternative approach: DbSession class extending sqlalchemy.Session

Alternative approach: DbSession class extending sqlalchemy.Session

To make the transition transparent for users, I proposed to implement changes at the lowest possible level - SQLAlchemy Session

Sessions are created in settings and which are then used by create_session method. It is then used to make queries to the Database with SQL Alchemy, e.g.:

Code Block
  .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)

To make it support DbIsolation mode, without any change in the user code -  the Session object must be extended - and it is by providing a subclass DBSession
It supports both "standard" and "DBIsolation" modes, overriding the Session class methods, example for query: (just a code snippet)

Code Block
class DBSession(Session):

  def __init__(
      # Session parameters
     # New parameter


    self.db_isolation_mode = db_isolation_mode
    self.db_api_client = None
      # Client for Airflow Database API
      self.db_api_client = DbApiClient() 

  def query(self, *entities, **kwargs):
    if not self.db_isolation_mode:
      return super().query(*entities, **kwargs)
    return DbApiClientQuery(entities=entities, db_api_client=self.db_api_client)

All other public methods must also be overridden in DBSession and either call proper super class methods or provide "db_isolation_mode" implementation - even if it's just raising an exception.

If db_isolation_mode is False then the DbSession works exactly the same way as the Session.

In db_isolation_mode = True it returns new object type: DbApiClientQuery that sends requests to Airflow Database API instead of SQL database:

Code Block
class DbApiClientQuery:

  def __init__(self, entities, db_api_client):
    self._db_api_client = db_api_client

  def filter(self, *criterion):
    for c in criterion:
      columnId= c.left.description
      value= c.right.effective_value
    return self

  def all(self):
    return self._call_api_query_method(DbQueryMode.ALL)

  def first(self):
    return self._call_api_query_method(DbQueryMode.FIRST)

  def one_or_none(self):
    return self._call_api_query_method(DbQueryMode.ONE_OR_NONE)

  def _call_api_query_method(self, query_mode):
    for (columnId,value) in self._filters:
    method = self._get_query_method_name()
    return getattr(self.db_api_client, method)(
      query_mode = query_mode,

  def _get_query_method_name(self):
    if self._entities[0]==DagRun.__name__:
      return 'get_dag_runs'
    raise NotImplementedError()

class DbQueryMode(Enum):
    ALL = 'all'
    FIRST = 'first'
    ONE_OR_NON ='one_or_none'

Note: It assumes the filter contains only simple conditions with AND operator.

DbApiClient must have get_dag_runs which calls the proper Airflow Database API method with provided optional parameters (as query strings).

DB Data mutations 

Mutations are implemented similarly to query, by overriding "delete" or "update" and mapping the request to the proper Airflow Database API method. 

For "add" it sends the serialized object to Airflow Database API where it is deserialized and added to the DB.

Complex statements

Not all statements can be covered with the approach described above, e.g. In there is:

Code Block
qry = (
  session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
  .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
  .filter(TI.dag_id == dag.dag_id)
  .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))

max_tis: List[TI] = (
      TI.dag_id == dag.dag_id,
      TI.task_id == qry.c.task_id,
      TI.execution_date == qry.c.max_ti,

Instead of covering all cases in the DbSession filter method, the DbSession class has a dedicated method for this case.

The code in simply calls this method, and its implementation covers both cases: DBDirect(using SQLALchemy and direct Database call) and DBIsolation mode - calling a dedicated Airflow Database APi endpoint (which calls the same DbSession method, but in DBDirect mode)


Code Block
class DBSession(Session):
  def get_max_tis(self, dag_id,task_ids):
    if self.db_isolation_mode:
      self.db_api_client.get_max_tis(dag_id, task_ids)
      qry = (
        session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
        .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
        .filter(TI.dag_id == dag.dag_id)
        .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
    return (
            TI.dag_id == dag.dag_id,
            TI.task_id == qry.c.task_id,
            TI.execution_date == qry.c.max_ti,


Together with Airflow Database API the following configuration setting are introduced:

  • [core] database_access_isolation

Whether the DBIsolation mode is enabled and untrusted components should use Airflow Database API for accessing Airflow Database

  • [core] database_api_host

Url for Airflow Database API. Used only if [core]database_access_isolation is True

Dag processor, worker and triggerer

All tests for DagProcessor, Workers (including Operators) and triggers are executed in both modes "DBDirect'' and "DBIsolation ''. The code for Airflow Database API, the DbApiClient and DBSession are extended until all tests pass.

Why is it needed?

This change allows Workers, DagProcessors and Trigerrer to work in untrusted mode - without direct access to Database.

Which users are affected by the change? 

Only those that set [core]database_access_isolation=True

How are users affected by the change? (e.g. DB upgrade required?)

What defines this AIP as "done"?

It is possible to start an Airflow Database APi from Airflow CLI and it is easily extensible with new methods. Workers, Dag Processors  and Trigerrer  can work in DbIsolation mode - without direct access to DB.