You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

StateDraft
Discussion Threadhttps://lists.apache.org/thread/nsmo339m618kjzsdkwq83z8omrt08zh3
Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Motivation

This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achite that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API.

Airflow Database APi is a new independent component of Airflow. It allows isolating some components (Worker, DagProcessor and Triggerer) from direct access to DB.

Note: Access control to Airflow Database API & Airflow Database rows is out of scope for this AIP, but it was kept in mind, making the implementation feasible.

Proposal

Airflow Database API is a new API, independent of existing Rest API embedded into Airflow Webserver:

  • They have different purposes: user-faced vs internal
  • They have different access model - end user credentials(roles) vs row-level access with tokens

Airflow Database API must provide methods only for "untrusted" components (worker/dag processor/triggerer), as Scheduler will use only DBDirect mode.

API

Airflow Database API is built using Connexion, which allows the creation of Rest API from Open API specification.

The new components can be started using a new command line command: airflow db-api.

Code is located in  /airflow/db/api/, with all endpoint's methods in /airflow/db/api/endpoints.

Query

DBClient class

To avoid copying the SQLAlchemy code to the Airflow Database API and maintaining two versions of it,  I propose to create a dedicated class that will contain all methods required by untrusted components that will either run SQLAlchemy query or make a call to Airflow Database API. The class is located in /airflow/db/api/client.

Airflow Database API itself will call the same method from the DBClient class, but as it runs in DBDirect mode it runs the SQLAlchemy. With that approach there is always a single place to add new code shared between DBIsolation and DBDirect mode.

Example:

For code


session.query(DagRun)
  .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
  .first()

defined as:It is replaced by the db_client.get_dag_run(dag_id, execution_date)

class DBClient

  @provide_session()
  def get_dag_run(self, dag_id, execution_date, session=None):
   If self.is_db_isolation:
     return self._get_dag_run_from_api(dag_id, execution_date)
    else:
      return session.query(DagRun)
		  .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
		  .first()


With following method in Airflow Database API:

class AirflowDatabaseAPI

  def get_dag_run(self, dag_id, execution_date):
    return self.db_client.get_dag_run(dag_id,execution_date)


The method may be extended, accepting more optional parameters to avoid having too many similar implementations.

There is no list of all methods that should be added - they will be gradually added during implementation.


Alternative approach: DbSession class extending sqlalchemy.Session

To make the transition transparent for users, I proposed to implement changes at the lowest possible level - SQLAlchemy Session

Sessions are created in settings and which are then used by create_session method. It is then used to make queries to the Database with SQL Alchemy, e.g.:

session.query(DagRun)
  .filter(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
  .first()


To make it support DbIsolation mode, without any change in the user code -  the Session object must be extended - and it is by providing a subclass DBSession
It supports both "standard" and "DBIsolation" modes, overriding the Session class methods, example for query: (just a code snippet)

class DBSession(Session):

  def __init__(
      self,
      # Session parameters
      bind=None,
      autoflush=True,
      autocommit=False,
      expire_on_commit=True,
      info=None,
     # New parameter
      db_isolation_mode=False):

    super().__init__(
      bind=bind,
      autoflush=autoflush,
      expire_on_commit=expire_on_commit,
      autocommit=autocommit,
      info=info)

    self.db_isolation_mode = db_isolation_mode
    self.db_api_client = None
    if(db_isolation_mode):
      # Client for Airflow Database API
      self.db_api_client = DbApiClient() 

  def query(self, *entities, **kwargs):
    if not self.db_isolation_mode:
      return super().query(*entities, **kwargs)
    return DbApiClientQuery(entities=entities, db_api_client=self.db_api_client)


All other public methods must also be overridden in DBSession and either call proper super class methods or provide "db_isolation_mode" implementation - even if it's just raising an exception.

If db_isolation_mode is False then the DbSession works exactly the same way as the Session.

In db_isolation_mode = True it returns new object type: DbApiClientQuery that sends requests to Airflow Database API instead of SQL database:

class DbApiClientQuery:

  def __init__(self, entities, db_api_client):
    self._entities=entities
    self._db_api_client = db_api_client
    self._filters=[]

  def filter(self, *criterion):
    for c in criterion:
      columnId= c.left.description
      value= c.right.effective_value
      self._filters.append((columnId,value))
    return self

  def all(self):
    return self._call_api_query_method(DbQueryMode.ALL)

  def first(self):
    return self._call_api_query_method(DbQueryMode.FIRST)

  def one_or_none(self):
    return self._call_api_query_method(DbQueryMode.ONE_OR_NONE)

  def _call_api_query_method(self, query_mode):
    kwargs={}
    for (columnId,value) in self._filters:
      kwargs[columnId]=value
    method = self._get_query_method_name()
    return getattr(self.db_api_client, method)(
      query_mode = query_mode,
      **kwargs)

  def _get_query_method_name(self):
    if self._entities[0]==DagRun.__name__:
      return 'get_dag_runs'
    raise NotImplementedError()

class DbQueryMode(Enum):
    ALL = 'all'
    FIRST = 'first'
    ONE_OR_NON ='one_or_none'

Note: It assumes the filter contains only simple conditions with AND operator.

DbApiClient must have get_dag_runs which calls the proper Airflow Database API method with provided optional parameters (as query strings).

DB Data mutations 

Mutations are implemented similarly to query, by overriding "delete" or "update" and mapping the request to the proper Airflow Database API method. 

For "add" it sends the serialized object to Airflow Database API where it is deserialized and added to the DB.

Complex statements

Not all statements can be covered with the approach described above, e.g. In processor.py there is:

qry = (
  session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
  .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
  .filter(TI.dag_id == dag.dag_id)
  .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
  .filter(TI.task_id.in_(dag.task_ids))
  .group_by(TI.task_id)
  .subquery('sq')
)

max_tis: List[TI] = (
  session.query(TI)
  .filter(
      TI.dag_id == dag.dag_id,
      TI.task_id == qry.c.task_id,
      TI.execution_date == qry.c.max_ti,
  )
  .all()
)

Instead of covering all cases in the DbSession filter method, the DbSession class has a dedicated method for this case.

The code in processor.py simply calls this method, and its implementation covers both cases: DBDirect(using SQLALchemy and direct Database call) and DBIsolation mode - calling a dedicated Airflow Database APi endpoint (which calls the same DbSession method, but in DBDirect mode)

e.g.


class DBSession(Session):
  ...
  def get_max_tis(self, dag_id,task_ids):
    if self.db_isolation_mode:
      self.db_api_client.get_max_tis(dag_id, task_ids)
    else:
      qry = (
        session.query(TI.task_id, func.max(TI.execution_date).label('max_ti'))
        .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
        .filter(TI.dag_id == dag.dag_id)
        .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))
        .filter(TI.task_id.in_(dag.task_ids))
        .group_by(TI.task_id)
        .subquery('sq')
    )
    return (
        session.query(TI)
        .filter(
            TI.dag_id == dag.dag_id,
            TI.task_id == qry.c.task_id,
            TI.execution_date == qry.c.max_ti,
        )
        .all()
    )

Configuration

Together with Airflow Database API the following configuration setting are introduced:

  • [core] database_access_isolation

Whether the DBIsolation mode is enabled and untrusted components should use Airflow Database API for accessing Airflow Database

  • [core] database_api_host

Url for Airflow Database API. Used only if [core]database_access_isolation is True

Dag processor, worker and triggerer

All tests for DagProcessor, Workers (including Operators) and triggers are executed in both modes "DBDirect'' and "DBIsolation ''. The code for Airflow Database API, the DbApiClient and DBSession are extended until all tests pass.

Why is it needed?

This change allows Workers, DagProcessors and Trigerrer to work in untrusted mode - without direct access to Database.

Which users are affected by the change? 

Only those that set [core]database_access_isolation=True

How are users affected by the change? (e.g. DB upgrade required?)

What defines this AIP as "done"?

It is possible to start an Airflow Database APi from Airflow CLI and it is easily extensible with new methods. Workers, Dag Processors  and Trigerrer  can work in DbIsolation mode - without direct access to DB.

  • No labels