You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

Status

StateDraft
Discussion Thread

JIRA Unable to render Jira issues macro, execution error.


Motivation

Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.

Considerations

Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc? 

What change do you propose to make?

DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. Prior to DAG loading, Airflow would download files from the remote DAG repository and cache it under the local filesystem directory under $AIRFLOW_HOME/dags.

DAG Repository

We will create a file, remote_repositories.json, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.

repositories.json
dag_repositories: [
	"repo1": {
		"url": "s3://my-bucket/dags",
		"conn_id": "blahblah"
	},
	"repo2": {
		"url": "git://repo_name/dags",
		"conn_id": "blahblah2"
	}
]

DagFetcher

The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a remote_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $AIRFLOW_HOME/dags/repo_id/

class BaseDagFetcher():
	def fetch(repo_id, url, conn_id, file_path=None):
		"""
		Download files from remote storage to local directory under $AIRFLOW_HOME/dags/repo_id
		"""

Proposed changes

DagBag

We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.

Scheduler

Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote at first .

What problem does it solve?


Why is it needed?


Are there any downsides to this change?


Which users are affected by the change?


How are users affected by the change? (e.g. DB upgrade required?)


Other considerations?


What defines this AIP as "done"?


  • No labels