You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

StateDraft
Discussion Thread

JIRA Unable to render Jira issues macro, execution error.


Motivation

Currently Airflow requires DAG files to be present on a file system that is accessible to the scheduler, webserver, and workers. Given that more and more people are running airflow in a distributed setup to achieve higher scalability, it becomes more and more difficult to guarantee a file system that is accessible and synchronized amongst services. By allowing Airflow to fetch DAG files from a remote source outside the file system local to the service, this grant a much greater flexibility, eases implementation, and standardizes ways to sync remote sources of DAGs with Airflow.

Considerations

Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc? 

What change do you propose to make?

DAGs are persisted in remote filesystem-like storage and Airflow need to know where to find them. DAG repository is introduced to record the remote root directory of DAG files. Prior to DAG loading, Airflow would download files from the remote DAG repository and cache them under the local filesystem directory under $AIRFLOW_HOME/dags.

DAG Repository

We will create a file, dag_repositories.json, to record the root directory of DAGs on the remote storage system. Multiple repositories are supported.

repositories.json
dag_repositories: [
	"repo1": {
		"url": "s3://my-bucket/dags",
		"conn_id": "blahblah"
	},
	"repo2": {
		"url": "git://repo_name/dags",
		"conn_id": "blahblah2"
	}
]

DagFetcher

The following is the DagFetcher interface, we will implement different fetchers for different storage system, GitDagFetcher and S3DagFetcher. Say we have a dag_repositories.json configuration like above. DagFetcher would download files under s3://my-bucket/dags to $AIRFLOW_HOME/dags/repo_id/

class BaseDagFetcher():
	def fetch(repo_id, url, conn_id, file_path=None):
		"""
		Download files from remote storage to local directory under $AIRFLOW_HOME/dags/repo_id
		"""

Proposed changes

DagBag

We should ensure that we are loading the latest DAGs cache copy, thus we should fetch DAGs from remote repo before we load the DagBag.

Scheduler

Currently, DagFileProcessorManager periodically calls DagFileProcessorManager._refresh_dag_dir to look for new DAG files. We should change this method to fetch DAGs from remote source at first.

What problem does it solve?

In a distributed Airflow setup, we need to have an additional process to move DAG files to services like scheduler, webserver and workers and make sure DAG files are in sync. By allowing Airflow services to fetch DAGs from remote sources, we remove the need for such additional process that is a duplicate effort. It also standardize the way we synchronize DAGs among services.

Why is it needed?

Same as above.

Are there any downsides to this change?

The DAG loading performance may be impacted as Airflow services will need to do an additional step to fetch DAGs from remote sources. To minimize the performance impact, each Airflow service will keep a local cache of DAGs and will only fetch if local copy is stale.

Which users are affected by the change?

This would only impact users who specified remote sources of DAGs. Users who prefer to use Airflow in the old way (moving DAG files themselves) won't be affected.

How are users affected by the change? (e.g. DB upgrade required?)

DAG loading performance can be affected as there is an additional step to fetch DAGs from remote sources.

Other considerations?

Should we introduce DAG manifest in this AIP?


Should we introduce DAG versioning in this AIP?

What defines this AIP as "done"?

Airflow support at least fetching DAGs from one kind of remote source. I am thinking of supporting fetching from Git at first.


  • No labels