Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

One of the core principles of AFS AS is to follow POSIX behavior as closely as possible. This ensures that users can leverage their familiarity with standard (python) file operations, such as reading, writing, listing, and deleting files, even when working with various object storage systems. This consistency simplifies DAG development and reduces the learning curve for Airflow users.

...

A con is with the introduction of fsspec as part of our dependencies and which is managed by a different community. That community is orders of a magnitude smaller than our own. So there is a certain risk to the speed by which bug fixes can be delivered if in fsspec. fsspec  has had consistent releases since its inception in 2019. Other high profile projects like Apache Arrow, Apache Iceberg and Pandas are also dependent on it. In case the project gets abandoned there is the possibility to switch to smart_open  (https://pypi.org/project/smart-open/) which has similar goals but not the strictness of interface.

Proposal

Mounting filesystems

Global vs Task Scope

We follow the traditional route of mounting a filesystem (volume) to a mount point as within Unix systems. This means that all files accessible to Airflow (as in Airflow understand how to access them - existing implementations will not be affected) are arranged in one big tree rooted at '/'. These files can be spread out over different filesystems. The mount method serves to attach a filesystem to the big tree. In Airflow we consider the scopes of Global and Task. This means that filesystems attached to the Global scope are available everywhere, but if they are attached within the Task scope they are not available outside the that scope. 

Example 1: Global vs Task

ObjectStoragePath

ObjectStoragePath is the main API users will work with. It has a pathlib.Path-like interface with some extensions. It accepts two extra optional parameters conn_id: str and store: ObjectStore (see below). Lazy initialisation of the underlying connection to the underlying store is ensured so that ObjectStorePath instances can be instantiated at the global scope. ObjectStoragePath can be serialised.

from airflow.io.store.path import ObjectStoragePath

base = ObjectStoragePath("s3://warehouse"import airflow.io.fs as afs
afs.mount("s3://warehouse", "/warehouse", conn_id="aws_warehouse") # global scope

@task
def task():
afs.mount("gs://data", "/data")data = base / "data.gz" # task scope
afsdata.lsstat("/data")

Mount points and auto mount points

...

ObjectStore

ObjectStore manages the filesystem or object storage. In general, users would not need to deal with its interface except when using a custom scheme or protocol which is not registered through one of the providers. 

fromimport airflow.io.fsstore asimport afsattach
# explicit mount point
afs.mount("file:///data", "/data")
afs.ls("/data)
# implicit mount point
warehouse = afs.mount("s3://warehouse")
warehouse.ls("/") # lists files relative to the root of the warehouse filesystem
warehouse2 = afs.mount("s3://warehouse")
assert warehouse.fsid == warehouse2.fsid

...

We support 3 access patterns. 1) by explicit mount point 2) by using the Path interface 3) context manager

# explicit
afs.mount("s3://warehouse", "/warehouse")
afs.ls("/warehouse")
# path interface
mnt = afs.mount("s3://warehouse", "/warehouse")
shutil.copyfileobj(mnt / "data.gz", mnt / "cleaned_data.gz")
# context manager
with afs.mount("s3://warehouse") as mnt:
mnt.ls("/"from airflow.io.store.path import ObjectStoragePath

store = attach(protocol="custom", fs=CustomFileSystem())

@task
def task()
o = ObjectStoragePath("custom://bla")
# or
o = ObjectStoragePath("bla", store=store)

Providers

Providers can support additional filesystemfilesystems, beyond the ones provided by core FSSPEC by providing a filesystem  module that has a get_fs(conn_id: str) → AbstractFilesystem  interface.  

...

What defines this AIP as "done"?

  • airflow.io is available
    • AFS AS interface should get one of the standard "citizens" of the Provider Package features.

      Existing provider packages must still be supported and previous provider packages not knowing about AFS should be also usable with newer Airflow versions

    • filesystems:
      - airflow.providers.microsoft.azure.fs.adls
  • DAGs can work with AFS
  • Initial implementation supports LocalFilesystem, S3, GCS, ADLS
    • These implementations must be hosted within the respective provider packages.

      • LocalFileSystem→ Okay to put this in core with the interface definitions
      • S3→AWS
      • GCS→Google
      • ADLS→Azure
    • Exception could be for generic filesystem that don't have a real provider (say for example a ZipFileSystem)
  • Out of scope: XCom, DAG Processor

...