Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

StateDraft
Discussion Thread
Created

2023-10-07

Progress Tracking

Summary

With Airflow FS (AFS) we want to address the challenges of fragmented code, improve XCom, and DAG processing complexity by providing a unified abstraction for accessing object storage systems. By doing so, we aim to enhance the overall user experience, promote code reusability, and simplify the development of Airflow workflows that interact with different storage backends.

...

What change do you propose to make?

One of the primary considerations for the adoption of AFS is its integration into the core Airflow package, hosted on airflow.io. This ensures availability across DAG Processing, XCOM, Authoring and Providers. To achieve this, AFS will be implemented based on the well-established fsspec library, a BSD licensed file system specification library. AFS aims to provide a standard and unified approach to working with various object storage systems through the concept of mounts and mountpoints. This unified approach will eliminate the need for users to grapple with cross-filesystem issues, such as copying data between different cloud storage providers and removing the need for specialised Operators that deal with this.

Image Modified

Why is it needed?

  • To reduce the number of basic (and not overly well maintained) Operators that do SourceToDest and vice versa operations. This can be replaced by a FileTransferOperator
  • To have a unified interface to file operations in TaskFlow and traditional Operators
  • To allow DAG processing to be using arbitrary locations (object storage)
  • To allow XCom to have a standardised way of dealing with larger data volumes
  • Simplify DAG CI/CD 
  • Streamlining pre-DAG to DAG (e.g. notebooks to DAG)

...

# explicit
afs.mount("s3://warehouse", "/warehouse")
afs.ls("/warehouse")

# path interface
mnt = afs.mount("s3://warehouse", "/warehouse")
shutil.copyfileobj(mnt / "data.gz", mnt / "cleaned_data.gz")

# context manager
with afs.mount("s3://warehouse") as mnt:
mnt.ls("/")

Providers

Providers can support additional filesystem, beyond the ones provided by core FSSPEC by providing a filesystem  module that has a get_fs(conn_id: str) → AbstractFilesystem  interface.  

Which users are affected by the change?

No users are affected by the change. If you do not want to make use of it, the old patterns still work.

What defines this AIP as "done"?

  • airflow.io is available
  • DAGs can work with AFS
  • Initial implementation supports LocalFilesystem, S3, GCS, ADLS
  • Out of scope: XCom, DAG Processor