Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: Typo in GCS scheme name

Status

Page properties


StateDraft
Discussion Thread
Vote Thread
Vote Result Thread
Progress Tacking (PR/GitHub Project/Issue Label)
Date Created

2024-01-02

Version Released
AuthorsTP Chung



Abstract

This document proposes formats to Dataset URIs of various schemes, so URIs of those schemes can have a standard semantic. The proposed URI formats are inspired by those of OpenLineage.

Motivation

With AIP-48 Data Dependency Management and Data Driven Scheduling, it is now possible for a DAG to be scheduled based on a task outlet describing a data resource. However, AIP-48 does not specify semantics of the URI and how two since Dataset is internally identified by a URI. This raises a few issues. First, DAG authors in the same organisation must internally agree on a format to use. It is more difficult for tools to interact with Datasets; since there is not a standard semantic to the URI, tools are left with either imposing a format, which may then conflict with a user’s internally agreed one, or being able to only receive limited information from the Dataset than it ideally could.

Airflow Connection has a URI format that can be used to represent a Connection object in a simple string form. However, the format specifies the connection by design, without distinguishing resources that can be fetched by a connection, even across different resource types. For example, a connection containing AWS credentials can be used to connect to AWS, but does not contain information on exact which service, or what resources inside a particular service.

The recent Airflow Object Storage (AIP-58) introduces new URI formats that can be used to directly reference an object (“file”) in a storage. While this fits directly with the use case described here, it does not specify the format used, but leaves it as implementation details of each storage. It also, by design, only covers file-like storages. This document would explicit specify in the same formats currently used by the Object Storage.

Considerations

This AIP is informational. Users may choose to not follow the proposed rules if they do not expect the URI to be semantically interpreted by Airflow (only being understood as a literal string), or do not intend to interchange the value with libraries, including but not limited to providers and plugins by third-party authors.

What change do you propose to make?

A URI format is defined for a resource that can logically trigger a downstream workflow a la Dataset events. A resource type can be a database (Postgres, MySQL, etc.), an object storage service (Amazon S3, Azure Object Storage, etc.), or the local filesystem.

When describing URI sections, the terminology used by Python’s urllib.parse.urlsplit function is used. Although a RFC 3986 is generally followed as the standard URI format, various URI parsers are known to slightly deviate in behaviour in edge cases. Python’s implementation is therefore used as the de facto reference since it is the easiest function for both Airflow and various provider and plugin packages to reach for, and also considered stable enough to be relied on. If a value cannot be parsed as a URI matching any of the known rules, it is treated as a simple literal string containing no semantics.

The URI formats should generally follow OpenLineage’s Dataset naming scheme. For file-like resources, this conveniently matches the URI format used by . Others may require slight adjustments so they look more familiar to Airflow users. Most significantly, we use slashes (/) to denote hierarchy in a database, similar to how Airflow Connection formats them, instead of dots (.) used by OpenLineage.

Certain sections in the URI should either normalised or dropped entirely, so that a resource can be canonically identified by one URI. Each section should be escaped (“quoted”) according to rules so a valid URI can be generated to represent the target resource.

In the long run, users are not expected to need to hand-craft the URIs, but use convenience functions or Database subclasses to create a Dataset with an appropriate URI. This document does not cover those APIs.

What problem does it solve?

With a standard URI format in place, features can be designed to rely on the interoperability. Most significantly, Dataset events can be emitted automatically by operators either developed in Airflow core, providers, or third parties, similar to how OpenLineage events are, based on operator arguments, without relying on the user to manually specify outlets. While this is already possible now, having documented URI formats means operators can be more confident in not stepping on other operators’ toes, and users more easily discover such features and schedule events around them.

A standard URI format also unblocks tools from implementing more efficient Dataset features based on semantics. For example, it would be possible for the scheduler to implement triggering on files matching a glob pattern, without needing to saturate the triggering events to account for each pattern listened on.

How are users affected by the change?

Users already using Datasets may need to review the values they currently use, and change any that conflict with the defined formats. This is expected to be a minor annoyance since the protocol (scheme) part is generally enough to distinguish between unrelated URI formats. The formats defined in this document are also quite straightfowrad and considered established enough that users using those services should already use mostly, if not entirely, compatible formats internally without a standard anyway.

Again, since this AIP is informational, users are still allowed to disregard the rules defined here, as long as they do not access any of the new features enabled by the standard. The new features should be strictly opt-in and only be enabled by explicit user requests (e.g. by setting a flag to True on the operator) for backward compatibility.

What defines this AIP as "done"?

The AIP should be considered done when a process for a service to add a new URI format is established. This is done by allowing services to record the format in the documentation, likely alongside with the corresponding operator and hook pages. Existing example DAGs on Datasets will be modified to use standard URI formats to demostrate the best practice, and additional examples will also be created to demostrate additional rules proposed in this document.


Specifications

NOTICE: This AIP will not be updated after acception to keep track of newly added rules used in the latest Airflow version. The Airflow documentation should describe the up-to-date URI formats, and be the canonical source instead.

A URI should be formatted as:

{protocol}://{netloc}{path}{query}
  • All parts are required unless explicitly specified.
  • The airflow protocol is reserved and should not used by any service.
  • Protocol values with a x- prefix are reserved for user-defined Dataset values. Airflow core and official providers should not define protocols in this namespace. Third-party libraries are also strongly discouraged from defining such protocols.
  • Unless specifically mentioned, the netloc part should be of form {host}[:{port}], where the port part, if missing, should be normalised into a value specified for each protocol. If a default port value is not specified, the port part is mandatory for the protocol.
  • Unless specifically mentioned, the netloc part MUST NOT contain auth information in the canonical URI form.
  • The path part starts from (including) the first / (forward slash). The final / in path is stripped, unless it is the only character present. This matches the behaviour of Python’s pathlib.
    • /foo/bar//foo/bar

    • //

  • Each path component (path separated by /) is percent-encoded if needed.
  • The canonical URI form should not contain a fragment (the part containing and after #).
  • Order of items in query (starting and including the ?) is insignificant, and should be normalised by ordering by the key’s Unicode code point value (ascending).

For example, each pair of URIs are equivalent semantically, with the latter being canonical.

s3://s3_default@some_bucket/order_data
s3://some_bucket/order_data

postgres://localhost/my_db/public/my_data
postgres://localhost:5432/my_db/public/my_data

service://token@location/path/to/data?y=2,3&x=1
service://location/path/to/data?x=1&y=2,3

Known Standard URI Formats

NOTICE: This AIP will not be updated after acception to keep track of newly added rules used in the latest Airflow version. The Airflow documentation should describe the up-to-date URI formats, and be the canonical source instead.

This section specifies how a Dataset URI should be written for various services.

Local file

  • Format: file://{host}{path} 
  • The host part can be empty, which is normalised into localhost.
  • The port part is not in the canonical form. It is ignored if present.

Amazon Simple Storage Service

  • Format: s3://{bucket_name}{path}
  • The netloc part denotes the bucket name, which is unique across an S3 partition. While it is technically possible for a bucket name to appear in multiple partitions (current three: standard, China, and US Gov), the use case of mixing partitions in an Airflow setup is considered too niche and therefore ignored. We may introduce additional protocol values in the future if the need arises.
  • The path part always starts with a slash (/), followed by the S3 object key name (which should never start with a slash). We include the leading slash in path to match the definition in RFC 3986, but the same format can be equivalently expressed as s3://{bucket_name}/{key_name}.

Google Cloud Storage

  • Format: gcs://{bucket_name}{path}
  • The gs protocol is an alias, normalised to gcs.
  • The netloc part denotes the bucket name, which is unique across all GCS.
  • The path part always starts with a slash (/), followed by the GCS object name (which should never start with a slash). We include the leading slash in path to match the definition in RFC 3986, but the same format can be equivalently expressed as s3gcs://{bucket_name}/{object_name}.

Postgres

  • Format: postgres://{host}[:{port}]/{database}/{schema}/{table}
  • The postgresql protocol is an alias, normalised to postgres.
  • Default port is 5432.
  • The schema value is required since it is not possible to infer the value statically without introspection.

MySQL

  • Format: mysql://{host}[:{port}]/{database}/{table}
  • Default port is 3306.
  • MariaDB also uses this form. The mariadb protocol is an alias, normalised to mysql.

Google BigQuery

  • Format: bigquery://{project_id}/{dataset}/{table}
  • The logical netloc part denotes the project ID, which is unique across all BigQuery projects according to Google documentation. Note that this is different from the “project name,” which is only unique to the user, or the “project number,” which although unique, is a serial number and not human-readable.

Trino

  • Format: trino://{host}[:{port}]/{catalog}/{schema}/{table}
  • Default port is 8080.

Implementation

Datasets are still created by the airflow.datasets.Dataset class. Core Airflow and each provider would register classes to handle protocols it understands, and functions to normalise URIs using them.

When a Dataset is created with a string that looks like a URI, it is parsed with with urlsplit, and Airflow checks if the protocol is registered, and call the corresponding function for turn the value into a canonical URI for comparison. The URI will continued to be stored as a raw string in both Dataset and DatasetModel.

The normalisation function takes a SplitResult and returns str

Quasi-pseudo implementation:

# The existing Dataset class.
class Dataset:
    def __init__(self, uri: str, extras):
        self.extra = extra  # Not related to this proposal.
        self.uri = _normalize_uri(uri)

def _normalize_uri(uri: str) -> str:
    try:
        parts = urllib.parse.urlsplit(uri)
    except ValueError:
      return uri  # Can't parse, treat as free-form text.
    try:
        normalizer = ProvidersManager().dataset_uri_handlers[parts.scheme]
    except KeyError:
        return uri  # No normalization logic, treat as free-form.
    try:
        return normalizer(parts)
    except Exception:
        log.exception("Failed to normalize %s", uri)
      return uri