Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Mark as in-progress and note experiemental status

(renamed from Airflow FS)

Status

StateDraftIn Progress
Discussion Thread
Vote Thread
https://lists.apache.org/thread/wokt58k15g81cjnsytq9k1ofvspb4d5c
Created

2023-10-07

Progress Tracking

Updated 2023-11-16: The AIP has been accepted as experiemental in 2.8.

Summary

With Airflow Store (AS) we want to address the challenges of fragmented code, improve XCom, and DAG processing complexity by providing a unified abstraction for accessing object storage systems. By doing so, we aim to enhance the overall user experience, promote code reusability, and simplify the development of Airflow workflows that interact with different storage backends.

Motivation

One common challenge faced by DAG authors is the diversity of object storage systems used to store data inputs, outputs, and intermediate results. Currently, Airflow lacks a unified and consistent way to access these various storage systems. This leads to an exponential amount of Operators that do "SourceToDest" and vice versa. These Operators do not necessarily keep to a common API standard leading to a cognitive burden for the DAG authors.

...

XCom was created to store small pieces of data to share between tasks. These values are then stored in Airflow's database. We do see users wanting to share more values (like Data Frames) and with the increasing use of TaskFlow it becomes more natural to do so as well. Databases are very suitable for storing the larger objects and while user have been creating custom XCom backends that interact with object storage systems.  Airflow can be improved to have first class support for larger objects.

Inspiration

The "Airflow Store" proposal takes inspiration from successful projects like Databricks FS and Apache Iceberg, which provide unified access to distributed file systems and structured data tables, respectively. Airflow FS aims to build on these concepts and offer a similar level of abstraction for object storage systems, making it easier for Airflow users to interact with different storage backends consistently.

Posix Behavior

One of the core principles of AFS AS is to follow POSIX behavior as closely as possible. This ensures that users can leverage their familiarity with standard (python) file operations, such as reading, writing, listing, and deleting files, even when working with various object storage systems. This consistency simplifies DAG development and reduces the learning curve for Airflow users.

Considerations

What change do you propose to make?

One of the primary considerations for the adoption of AS is its integration into the core Airflow package, hosted on airflow.io. This ensures availability across DAG Processing, XCOM, Authoring and Providers. To achieve this, AS will be implemented based on the well-established fsspec library, a BSD licensed file system specification library. AFS aims to provide a standard and unified approach to working with various object storage systems through the concept of mounts and mountpoints. This unified approach will eliminate the need for users to grapple with cross-filesystem issues, such as copying data between different cloud storage providers and removing the need for specialised Operators that deal with this.

Image Modified

Why is it needed?

  • To reduce the number of basic (and not overly well maintained) Operators that do SourceToDest and vice versa operations. This can be replaced by a FileTransferOperator
  • To have a unified interface to file operations in TaskFlow and traditional Operators
  • To allow DAG processing to be using arbitrary locations (object storage)
  • To allow XCom to have a standardised way of dealing with larger data volumes
  • Simplify DAG CI/CD 
  • Streamlining pre-DAG to DAG (e.g. notebooks to DAG)

Further considerations

While nothing prevents making use of fsspec  by users and even XCOM and DAG Processing there are several benefits that are hard to get when rolling your own. Obvious pros of having native fsspec support is a simplified authorization (with Airflow connections) and having a single FileTransferOperator instead of n^2 source-to-dest ones. Next to that when doing same-filesystem operations AFS will us the most native way of doing so, which would require rigor on the end of the user to have the equivalent there. If bug-fixes are required there is only one place where we would need to fix them.

A con is with the introduction of fsspec as part of our dependencies and which is managed by a different community. That community is orders of a magnitude smaller than our own. So there is a certain risk to the speed by which bug fixes can be delivered if in fsspec. fsspec  has had consistent releases since its inception in 2019. Other high profile projects like Apache Arrow, Apache Iceberg and Pandas are also dependent on it. In case the project gets abandoned there is the possibility to switch to smart_open  (https://pypi.org/project/smart-open/) which has similar goals but not the strictness of interface.

Proposal

Mounting filesystems

Global vs Task Scope

We follow the traditional route of mounting a filesystem (volume) to a mount point as within Unix systems. This means that all files accessible to Airflow (as in Airflow understand how to access them - existing implementations will not be affected) are arranged in one big tree rooted at '/'. These files can be spread out over different filesystems. The mount method serves to attach a filesystem to the big tree. In Airflow we consider the scopes of Global and Task. This means that filesystems attached to the Global scope are available everywhere, but if they are attached within the Task scope they are not available outside the that scope. 

Example 1: Global vs Task

ObjectStoragePath

ObjectStoragePath is the main API users will work with. It has a pathlib.Path-like interface with some extensions. It accepts two extra optional parameters conn_id: str and store: ObjectStore (see below). Lazy initialisation of the underlying connection to the underlying store is ensured so that ObjectStorePath instances can be instantiated at the global scope. ObjectStoragePath can be serialised.

from airflow.io.store.path import ObjectStoragePath

base = ObjectStoragePathimport airflow.io.fs as afs
afs.mount("s3://warehouse", "/warehouse", conn_id="aws_warehouse") # global scope

@task
def task():
data = base afs.mount("gs://data", "/data.gz") # task scope
afsdata.lsstat("/data")

Mount points and auto mount points

...

ObjectStore

ObjectStore manages the filesystem or object storage. In general, users would not need to deal with its interface except when using a custom scheme or protocol which is not registered through one of the providers. 

fromimport airflow.io.fsstore asimport afsattach
# explicit mount point
afs.mount("file:///data", "/data")
afs.ls("/data)
# implicit mount point
warehouse = afs.mount("s3://warehouse")
warehouse.ls("/") # lists files relative to the root of the warehouse filesystem
warehouse2 = afs.mount("s3://warehouse")
assert warehouse.fsid == warehouse2.fsid

...

We support 3 access patterns. 1) by explicit mount point 2) by using the Path interface 3) context manager

# explicit
afs.mount("s3://warehouse", "/warehouse")
afs.ls("/warehouse")
# path interface
mnt = afs.mount("s3://warehouse", "/warehouse")
shutil.copyfileobj(mnt / "data.gz", mnt / "cleaned_data.gz")
# context manager
with afs.mount("s3://warehouse") as mnt:
mnt.ls("/"from airflow.io.store.path import ObjectStoragePath

store = attach(protocol="custom", fs=CustomFileSystem())

@task
def task()
o = ObjectStoragePath("custom://bla")
# or
o = ObjectStoragePath("bla", store=store)

Providers

Providers can support additional filesystemfilesystems, beyond the ones provided by core FSSPEC by providing a filesystem  module that has a get_fs(conn_id: str) → AbstractFilesystem  interface.  

Implementation

Which users are affected by the change?

No users are affected by the change. If you do not want to make use of it, the old patterns still work.

What defines this AIP as "done"?

  • airflow.io is available
    • AFS AS interface should get one of the standard "citizens" of the Provider Package features.

      Existing provider packages must still be supported and previous provider packages not knowing about AFS should be also usable with newer Airflow versions

    • filesystems:
      - airflow.providers.microsoft.azure.fs.adls
  • DAGs can work with AFS
  • Initial implementation supports LocalFilesystem, S3, GCS, ADLS
    • These implementations must be hosted within the respective provider packages.

      • LocalFileSystem→ Okay to put this in core with the interface definitions
      • S3→AWS
      • GCS→Google
      • ADLS→Azure
    • Exception could be for generic filesystem that don't have a real provider (say for example a ZipFileSystem)
  • Out of scope: XCom, DAG Processor

...