Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Motivation

The core motivator of this proposal is the ability to efficiently perform setup and teardown activities in a DAG. One simple example of this is provisioning an external resource for a DAG run, like an EMR cluster: create a cluster on DagRun before the first task, and to easily and cleanly tear it down after the last task has finished, even in case of failures – Airflow itself would benefit from this ability in our "system"/integration tests!

This setup and teardown feature also plays a role when performing data processing inside Airflow. So far data handling with Airflow tasks has been relatively simplistic. Prior to AIP-48 Airflow has depended on a feature called XCom for passing data between tasks. This data is persisted in the Airflow metadatabase in the course of DAG execution and was originally intended for small data elements between tasks within the same DAG. As part of the Airflow 1.10.12 release, this feature was extended to support custom XCom data handlers to enable persistence of XCom data in external data sources such as S3 and Google Cloud Storage, thereby enabling large data sets to be passed between tasks using XCom as the core abstraction. Further, as part of the Airflow 2.0 release, users can now pass data between tasks within a DAG inherently through the TaskFlow API.

However, the XCom data is never cleaned up by Airflow. As this data grows larger, system performance can degrade and costs can increase over time. The current mitigation is to use maintenance DAGs or “airflow db clean” to clean up older XCom data, but this can be problematic when working with external data stored using custom XCom backends.

Especially as there are growing needs to share large data sets across tasks and across DAGs, using XCom and hopefully soon with Datasets as envisioned with AIP-48, this problem will only get worse. This should be cleanly handled by Airflow and adding this capability to build this is the core motivation for this proposal.

What's the TL;DR?

Add a new syntax for marking tasks as setup/teardown that:

  • Teardown tasks will still run even if the upstream tasks have otherwise failed
  • Teardown tasks failing don't always result in the DagRun being marked as failed (up to DAG author to choose failure mode)
  • Automatically clear setup/teardown tasks when clearing a dependent task

Setup and Teardown Task Semantics

Scope and Execution order

Setup and teardown tasks can appear at the dag level, or inside any task group.

Setup tasks will run "outside in" — at the DAG level first and then for a task group. Teardown tasks run the opposite, inside out.

A task group's setup tasks will run before any tasks in that TG execute, and they must complete before the normal tasks will be eligible for scheduling.

If a setup task fails, no further normal or setup tasks will run, and once all currently executing tasks are finished the teardown stack will be executed. Running tasks will not be terminated. If the setup task fails the teardown for the same scope will not run. Teardown tasks from outer scopes will still run. (i.e. DAG level teardown will run even if the setup of a task group fails) — essentially it is the responsibility of a setup task to clean up after itself with try/except if it doesn't complete.

Teardown tasks run even when tasks fail

Compared to a normal task which will examine the state of the upstream tasks, Teardown tasks will always run as long as the setup task (if there is one) completed successfully.

The current way of dealing with this sort of task would be to have a task set with trigger_rule="all_done"  but that has a few drawbacks/disadvantages over this proposal

  • The DAG graph is visually very confusing

    By having the each and every task needing to have a dependency to the teardown task there are a lot of edges in the graph, and as a result it is much harder to work out what the "actual" flow of tasks is
  • The concept of a teardown task is much more explicit and lets users be more declarative in what they want to achieve.

    This is part of the theme with our AIPs lately, of giving users native ways in Airflow to express what they want and remove the need for hacks and workarounds.

  • No need to be careful about setting dependencies for complex tasks.

    This is especially useful when coupled with the ability to add teardown tasks to Task Groups and lets users control the lifetime of resources much more easily.

  • (by default) Doesn't affect the resolution of the DagRun, which is possible to do when using an all_done task (by examining the state of the upstream tasks and working out if you should fail or succeed) but requires every user to implement this themselves.

Teardown failure doesn't have to result in DagRun failure

There are many flavours of teardown task where the teardown failing is acceptable and the DagRun should still be marked as success, but having the teardown task still clearly marked as failure is good for operations. For example, if you fail to tear down an EMR cluster when you already have an external process that terminates idle clusters periodically, then it makes sense to not fail the DagRun as the data processing (i.e. the most important part of the DagRun) was successful.

To this end teardown tasks will also accept an `on_failure_fail_dagrun` boolean argument that controls if a failure in the teardown task results in the DagRun being marked as failed (`on_failure_fail_dagrun=True`) or if the teardown task result doesn't change the DagRun state (`False`, the default) – that is when this is `False`, the final state of the DagRun will be computed as if the teardown task simply didn't exist.

Feature: Clearing automatically clears setup & teardown too

When a task in a DAG with setup or teardown tasks is cleared, the "in scope" setup and clear down tasks will also be cleared. (This is one key feature that cannot be easily achieved right now in Airflow.)

At most one setup and one teardown task per scope

In this initial version we limit it such that only a single setup and a single teardown task are allowed at any given level. (Although implementation detail: we should consider storing these as lists so that we can easily add multiple in a future release.)

It is an error for setup and teardown tasks to have dependencies (This is to allow us to work out the semantics of what multiple setup/teardown should be at a future point, and to reserve deps for this if we want it)

DAG Authoring syntax

Use with Taskflow API

Code Block
languagepy
from airflow import DAG, task, setup, teardown
 
 
with DAG(dag_id='test'):
    @setup
    def create_cluster():
        ...
        return cluster_id
 
    @task
    def load(ti):
        # Example:
        cluster_id = ti.xcom_pull(task_id="create_cluster")
 
    def summarize():
        ...
 
    @teardown(on_failure_fail_dagrun=False)
    def teardown_cluster():
        ...
       cluster_id = ti.xcom_pull(task_id="create_cluster")
 
    create_cluster()
    load() >> summarize()
    teardown_cluster()

The important thing to note here is that there is no explicit dependency between the setup/teardown tasks and the normal tasks.

Use with classic operator

These two decorators will be written such that they will also work for classic operators:

Code Block
languagepy
from airflow import setup_task
 
with DAG(...):
    job_flow_creator = setup_task(EmrCreateJobFlowOperator(
        task_id='create_job_flow',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
    ))

As a style point, having setup(MyOperator()) didn't seem to fit conceptually with us, so we propose re-exporting `setup` and `teardown` as `setup_task` and `teardown_task` from the airflow module (lazily, like everything else there).

Use with TaskGroups

To add a setup/teardown task to a task group, instantiate it within an active TG scope:

Code Block
languagepy
with TaskGroup("my_tg"):

    @setup
    def my_tg_setup():
        ...


    my_tg_setup()

Use with any decorated operator, python is automatic

The `@setup` and `@teardown` decorators will work against a plain function (which is automatically converted to a `@task.python` function, and against an already decorated function to allow setup tasks to run in Docker etc.

All of these are valid:

Code Block
languagepy
@setup
@task
def already_decorated():
    ...


@setup
def automatic_python_task():
    ...


@setup
@task.docker
def docker_setup():
    ...


Future Work

  • Automatically convert dag-level callback functions to a teardown task (It means logs for them are now visible in the UI!)

    This would be nice, but there are some semantics about setup/teardown on failure that make this hard (as this would easily lead to multiple teardown tasks at the DAG level)

  • Allow more than one setup/teardown at each scope

    (Related to the above point)

    In order for this to "behave" sensibly this would need the possibility of both dependency between setup tasks and also the ability to "pair" setup and teardown tasks. For example if you have:

    setup1, setup2, teardown2, teardown1

    and setup2 fails, the "ideal" would be that teardown1 runs but that teardown2 doesn't.

Implementation Approaches Considered

Broadly we considered two implementation approaches which can be described as:

  1. Automatically add dependencies and specialize the UI (so the scheduler and task dependency code doesn't need to be changed); or
  2. Specialize the Scheduler (meaning task dependency code handles setup/teardown but the UI doesn't need special treatment of the graph)

Our preferred approach, if it is feasible, is to not have to update the task dependency code as that is one of the most complex and performance crucial parts of the Airflow codebase

The way we thought about achieving this would be to add a post-process step in DAG parsing (before it is serialized to the DB) that would automatically add the setup task upstream of every task. Essentially, running something like `setup >> [*tasks]`. 

However where this really falls down is Trigger Rules. Take this example (modified slightly from earlier in this document):

Code Block
languagepy
from airflow import DAG, setup, teardown, task
from airflow.utils.trigger_rule import TriggerRule


with DAG(dag_id='test'):
    @setup
    def create_cluster():
        ...


    @task
    def this_might_fail():
        ...


    @task(trigger_rule=TriggerRule.ALL_FAILED)
    def fallback():
        ...


    @teardown
    def delete_cluster():
        ...


    create_cluster()
    this_might_fail() >> fallback()
    delete_cluster()
If we assume the intent of the DAG Author is to have fallback() for recovering from the upstream tasks failing, then by adding implicit dependencies it makes it harder for authors to reason about when a task will actually run. Should it only run in case of a "normal task failing", or should trigger_rules apply to setup too?

We take the view that since there was no explicit dependency mentioned by the DAG author, that the trigger rule should not apply to setup tasks, and as a result the dependency checker will need to know about setup tasks, ruling out approach 1.