Status
Motivation
Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.
Considerations
Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc?
What change do you propose to make?
DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. Prior to DAG loading, Airflow would download files from the remote DAG repository and cache it under the local filesystem directory under $AIRFLOW_HOME/dags.
DAG Repository
We will create a file, remote_repositories.json, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.
dag_repositories: [ "repo1": { "url": "s3://my-bucket/dags", "conn_id": "blahblah" }, "repo2": { "url": "git://repo_name/dags", "conn_id": "blahblah2" } ]
DagFetcher
The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a remote_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $AIRFLOW_HOME/dags/repo_id/
class BaseDagFetcher(): def fetch(repo_id, url, conn_id, file_path=None): """ Download files from remote storage to local directory under $AIRFLOW_HOME/dags/repo_id """
Proposed changes
DagBag
We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.
Scheduler
Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote at first .