You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

(renamed from Airflow FS)

Status

StateDraft
Discussion Thread
Created

2023-10-07

Progress Tracking

Summary

With Airflow Store (AS) we want to address the challenges of fragmented code, improve XCom, and DAG processing complexity by providing a unified abstraction for accessing object storage systems. By doing so, we aim to enhance the overall user experience, promote code reusability, and simplify the development of Airflow workflows that interact with different storage backends.

Motivation

One common challenge faced by DAG authors is the diversity of object storage systems used to store data inputs, outputs, and intermediate results. Currently, Airflow lacks a unified and consistent way to access these various storage systems. This leads to an exponential amount of Operators that do "SourceToDest" and vice versa. These Operators do not necessarily keep to a common API standard leading to a cognitive burden for the DAG authors.

At the same time DAG processing is entirely bound to a filesystem mounted to the system local to the DAG  processor. Running in containers a local filesystem is often ephemeral and admins deploy things like git-sync or redeployments to deal with this. A preference is to use object storage, however with Airflow relying heavily on filesystem semantics this is currently not an option.

XCom was created to store small pieces of data to share between tasks. These values are then stored in Airflow's database. We do see users wanting to share more values (like Data Frames) and with the increasing use of TaskFlow it becomes more natural to do so as well. Databases are very suitable for storing the larger objects and while user have been creating custom XCom backends that interact with object storage systems.  Airflow can be improved to have first class support for larger objects.

Inspiration

The "Airflow Store" proposal takes inspiration from successful projects like Databricks FS and Apache Iceberg, which provide unified access to distributed file systems and structured data tables, respectively. Airflow FS aims to build on these concepts and offer a similar level of abstraction for object storage systems, making it easier for Airflow users to interact with different storage backends consistently.

Posix Behavior

One of the core principles of AFS is to follow POSIX behavior as closely as possible. This ensures that users can leverage their familiarity with standard (python) file operations, such as reading, writing, listing, and deleting files, even when working with various object storage systems. This consistency simplifies DAG development and reduces the learning curve for Airflow users.

Considerations

What change do you propose to make?

One of the primary considerations for the adoption of AS is its integration into the core Airflow package, hosted on airflow.io. This ensures availability across DAG Processing, XCOM, Authoring and Providers. To achieve this, AS will be implemented based on the well-established fsspec library, a BSD licensed file system specification library. AFS aims to provide a standard and unified approach to working with various object storage systems through the concept of mounts and mountpoints. This unified approach will eliminate the need for users to grapple with cross-filesystem issues, such as copying data between different cloud storage providers and removing the need for specialised Operators that deal with this.

Why is it needed?

  • To reduce the number of basic (and not overly well maintained) Operators that do SourceToDest and vice versa operations. This can be replaced by a FileTransferOperator
  • To have a unified interface to file operations in TaskFlow and traditional Operators
  • To allow DAG processing to be using arbitrary locations (object storage)
  • To allow XCom to have a standardised way of dealing with larger data volumes
  • Simplify DAG CI/CD 
  • Streamlining pre-DAG to DAG (e.g. notebooks to DAG)

Further considerations

While nothing prevents making use of fsspec  by users and even XCOM and DAG Processing there are several benefits that are hard to get when rolling your own. Obvious pros of having native fsspec support is a simplified authorization (with Airflow connections) and having a single FileTransferOperator instead of n^2 source-to-dest ones. Next to that when doing same-filesystem operations AFS will us the most native way of doing so, which would require rigor on the end of the user to have the equivalent there. If bug-fixes are required there is only one place where we would need to fix them.

A con is with the introduction of fsspec as part of our dependencies and which is managed by a different community. That community is orders of a magnitude smaller than our own. So there is a certain risk to the speed by which bug fixes can be delivered if in fsspec. fsspec  has had consistent releases since its inception in 2019. Other high profile projects like Apache Arrow, Apache Iceberg and Pandas are also dependent on it. In case the project gets abandoned there is the possibility to switch to smart_open  (https://pypi.org/project/smart-open/) which has similar goals but not the strictness of interface.

Proposal

Mounting filesystems

Global vs Task Scope

We follow the traditional route of mounting a filesystem (volume) to a mount point as within Unix systems. This means that all files accessible to Airflow (as in Airflow understand how to access them - existing implementations will not be affected) are arranged in one big tree rooted at '/'. These files can be spread out over different filesystems. The mount method serves to attach a filesystem to the big tree. In Airflow we consider the scopes of Global and Task. This means that filesystems attached to the Global scope are available everywhere, but if they are attached within the Task scope they are not available outside the that scope. 

Example 1: Global vs Task

import airflow.io.fs as afs

afs.mount("s3://warehouse", "/warehouse", conn_id="aws_warehouse") # global scope

@task
def task():
afs.mount("gs://data", "/data") # task scope
afs.ls("/data")

Mount points and auto mount points


Filesystems can be mounted with an explicit mount point or implicit mount point in order to support several patterns. Implicit mount points generate references that are usable across instances. Meaning that a mount point create with the same underlying FS and connection id will generate the same implicit mount point over time.

import airflow.io.fs as afs

# explicit mount point
afs.mount("file:///data", "/data")
afs.ls("/data)

# implicit mount point
warehouse = afs.mount("s3://warehouse")
warehouse.ls("/") # lists files relative to the root of the warehouse filesystem
warehouse2 = afs.mount("s3://warehouse")
assert warehouse.fsid == warehouse2.fsid

Access patterns

We support 3 access patterns. 1) by explicit mount point 2) by using the Path interface 3) context manager

# explicit
afs.mount("s3://warehouse", "/warehouse")
afs.ls("/warehouse")

# path interface
mnt = afs.mount("s3://warehouse", "/warehouse")
shutil.copyfileobj(mnt / "data.gz", mnt / "cleaned_data.gz")

# context manager
with afs.mount("s3://warehouse") as mnt:
mnt.ls("/")

Providers

Providers can support additional filesystem, beyond the ones provided by core FSSPEC by providing a filesystem  module that has a get_fs(conn_id: str) → AbstractFilesystem  interface.  

Which users are affected by the change?

No users are affected by the change. If you do not want to make use of it, the old patterns still work.

What defines this AIP as "done"?

  • airflow.io is available
    • AFS interface should get one of the standard "citizens" of the Provider Package features.

      Existing provider packages must still be supported and previous provider packages not knowing about AFS should be also usable with newer Airflow versions

    • filesystems:
      - airflow.providers.microsoft.azure.fs.adls
  • DAGs can work with AFS
  • Initial implementation supports LocalFilesystem, S3, GCS, ADLS
    • These implementations must be hosted within the respective provider packages.

      • LocalFileSystem→ Okay to put this in core with the interface definitions
      • S3→AWS
      • GCS→Google
      • ADLS→Azure
    • Exception could be for generic filesystem that don't have a real provider (say for example a ZipFileSystem)
  • Out of scope: XCom, DAG Processor


  • No labels