Summary

Solutions that address the known issues in the existing Task-Level SLA feature result in overloading the logic on the scheduler. This is inevitable because of the number of individual tasks we have to evaluate the SLA on, as well as the different states the individual tasks can be in - whether that be finished, unfinished, or not yet scheduled. Given this clear downside, keeping the current definition of task-level callbacks may not be strictly necessary, especially with tools like DateTimeTriggers that can substitute the function of task-level SLA callbacks. 

On the other hand, I believe that SLAs defined at the DAG level will be extremely useful as a 'catch-all' alert in case anything goes wrong in a DAG. In addition, SLAs defined at the DAG level will be incredibly lightweight to detect and execute callbacks for.

'Time-limit' Task-Level SLAs, whose measurement is contained within a task (as opposed to in the scheduler or the dag_file_processing_processor) is also a viable alternative that is highly scalable.

Hence, I'd like to propose that SLAs be refactored to these two implementations, and that the existing Task-level SLA feature that keeps track of the time difference between the dag and a task be deprecated.

Motivation

We often need to guarantee that a job is finished by a certain deadline. This concept is loosely defined as an ‘SLA’. In the scope of scheduled jobs, an SLA usually is referred to relative to the expected start time of the job. While the current SLA feature evaluates the event of missing an SLA correctly according to this definition, it is more of a misfeature due to the following list of problems.

The original discussion was carried out on the detailed Google Doc.

Problems in the Current State

  1. Sla_miss is evaluated only if the task_id has ever been in SUCCESS or SKIPPED state. This means that if the task has not succeeded in the first place, due to it being blocked, or is still running, or is yet to be scheduled, the sla_miss_callback is not executed. This is a detrimental misfeature of the existing SLA implementation, as users often want to use SLA for delay detection, to enable them to investigate any possible issues in their workflow pre-emptively. If a dag is ever changed to rename the task_ids or add new task_ids, the first runs of the new task_ids will never have their SLAs evaluated. 
  2. SLA is defined as a timedelta relative to the dagrun.data_interval_end based on the timetable of the scheduled job. Some DAGs are triggered externally either manually or via a Dataset, meaning they don’t have a fixed schedule or start time preventing the evaluation of an SLA miss. 
  3. Sla_miss_callback is an attribute of the DAG, but the SLA’s are defined as attributes of the individual tasks. This is counterintuitive, and does not allow users to run callbacks that are customized at the task level if deadlines are missed.
  4. The current implementation triggers the parsing of dag files when an sla_miss_callback is sent to the DagFileProcessor. This means that if an sla is defined within a dag, we may end up parsing the dag file more frequently than at the frequency defined by min_file_process_interval because the sla_miss_callbacks are sent to the DagFileProcessor at every time dag_run.update_state is called. This is especially bad for sla_miss_callbacks compared to the other dag level callbacks, because sla_miss_callback is requested everytime we examine and attempt to schedule the dagrun, even if there are no state changes within the dag.
  5. The overloading of responsibilities on the DagFileProcessor leads to the performance delays in the separate tasks of parsing and loading DAGs and detecting and recording SLA misses for a DAG affecting each other. These are both time sensitive processes with tunable parameters for parsing timeouts. Overloading the responsilibty of the DagFileProcessor is hence not simply confusing for users, but also makes it difficult to fine-tune for the needs of their cluster.


Desired Behavior: Detected ASAP at data_interval_end + SLA

State Diagram and Challenges in Solving Above Problems for Existing Task-level SLAs

In a single threaded process, managing the above sequence of actions is really simple. But in a distributed set of processes, we need to describe the above flow with a state diagram, so that the detection of missed SLA and the execution of an SLA callback can be managed by processes or parts of the scheduler loop that are best equipped to handle them. This is what the current architecture seeks to do, regrettably along with all of the issues outlined above. Although the current complicated workflow of detecting and alerting on SLA misses stretches across multiple parts of the scheduler - from the scheduler loop consistently sending poorly defined callbacks to the DagFileProcessor and the DagFileProcessorProcess constantly creating and updating these records in the SlaMiss table, we can describe this workflow in a set of state changes described below (referred to hereby as SLA state).


One important thing to note here is that although this is a simple acyclic and directed set of states, this is a new lineage of states that co-exists in parallel to the current set of states in Apache Airflow. Since a task can have any permutation of SLA state and Core Airflow State - we would need to evaluate whether the state would need to be updated for all of the task states, instead of just checking for tasks with unfinished states. This is obviously bad because it will incur much more work on the scheduler (or another process) and on the metadata database to have to evaluate the SLA state update for all tasks in any states. 

One way to get around this would be to limit the evaluation of SLA state on just unfinished tasks. But this would mean that we would need to make the evaluation and update of the SLA state a first-class component of the task’s state change diagram, and make a strong guarantee that the SLA state of a finished task as defined by an executor/scheduler is up-to-date and final. This would require us to evaluate the state within the executor or the scheduler and overload _process_executor_events call in the scheduler.

In addition, users may want to track the SLA of tasks that have yet to be scheduled that are currently blocked by delays in upstream tasks. Evaluating SLAs for task instances that have yet to be scheduled will complicate the logic even more. (This is explored in depth in the Google Doc Addendum: Idea 1). This is exactly the reason why the sla evaluation logic was moved out from the scheduler, and into the DagFileProcessingProcess, which is also the reason why the method signature for sla_miss_callback currently does not have access to the DagRun Contect.

Different Types of SLAs

Breaking down SLAs in the following categories helped the community identify the different technical challenges involved in addressing each problem:

  1. DAG SLAs
  2. Time-limit Task-level SLAs (SLAs that are measured and contained within the task itself, as an absolute value since the beginning of the task)
  3. Task-level SLA measured from DAG-run scheduled start time

The existing implementation of SLAs is an incomplete implementation of (3). As we have already explored in detail, attempting to resolve the problems in the existing task-level SLAs comes at a cost of incurring much more work on core Airflow infrastructure.

On the other hand, I believe that SLAs of types (1) and (2) can be managed in a scalable and reliable manner, as they do not have to constantly evaluate the relationship between a dag and all of its tasks. Instead they can rely on existing running processes to keep track of slas and execute desired callbacks.

Considerations

What change do you propose to make?

There are three parts to this proposal.

Part 1: First proposal is that we implement a DAG-level SLA feature, that is simpler to implement and manage, similar to the existing DAG-level on_success_callback and on_failure_callback.

Part 2: Second proposal is that we mark the existing Task-level SLA misfeature for deprecation, and target the deprecation for Airflow 3.0.

Part 3: Finally, I propose that we implement a new Task-level SLA feature, that is contained within the bounds of the task's lifetime, and is measured within the task itself.

Added:

DAG.sla: None | timedelta

DAG.on_sla_miss_callback: None | DagStateChangeCallback | list[DagStateChangeCallback]

Dagrun.sla_missed: None | bool

BaseOperator.on_sla_miss_callback: None | TaskStateChangeCallback | list[TaskStateChangeCallback]

Updated:

BaseOperator.sla

Marked for Deprecation (3.0.0):

DAG.sla_miss_callback

SlaMiss

SlaMiss UI tab

Evaluation Logic added to

Dagrun.update_state

Dag.handle_callback

BaseOperator._execute_task

What problem does it solve?

Using DAG-level SLA feature, and a Task-level sla that is measured within the Task itself, solves all of the four problems listed under the Problems in the Current State section.

  1. The SLA Miss will be detected as close to when it happens, and could actually be used to pre-emptively detect delays in their jobs, which is a very attractive feature increment for teams that are managing pipelines that have sensitive deadlines.
  2. For Dag level SLAs, we will define the SLA from the dagrun.start_date for any dagrun that is not a DagRunType.BACKFILL type. This means that we will be able to extend the SLA feature to DagRunType.MANUAL and DagRunType.DATASET_TRIGGERED dagruns.
  3. For Task level SLAs, we will define the SLA from the actual start time of the task. 
  4. sla and on_sla_miss_callback will be DAG or task level attributes, matching where the callback and sla will be measured for. Hence, the refactored feature will feel much more consistent for new users looking to adopt it
  5. enabling sla feature will no longer overload the DagFileProcessingProcessor and the metadata database with sla_miss_callback requests and queries

.

Are there any downsides to this change?

The following types of SLAs are out of scope with the new DAG and Task SLA features.

  1. New Task level SLA cannot be used within Deferrable Operators
  2. SLAs of task relative to the scheduled start time of the DAG

A workaround to this is to use DateTimeTriggers and create helper tasks that can monitor specific tasks, and execute custom callbacks if they have not been completed within the defined SLA. These helper tasks could easily be customized to meet other diverse SLA measuring requirements that some users in the community has been requesting to date.

Below is an example of how these helper tasks could be implemented with the use of Deferrable Operators and Triggers.



Which users are affected by the change?

Users making use of SLAs to detect the deadline compared to the dagrun's scheduled start time will have to find a different workaround (one of which is suggested above)

How are users affected by the change? (e.g. DB upgrade required?)

DB upgrade will be required, as the sla_miss table will be deprecated, and sla_missed boolean attribute must be added to Dagrun's data model.

Other considerations?

Please find a detailed summary of all other considerations that have been explored in detail on the Google Doc


What defines this AIP as "done"?

This AIP is "done" when the proposed DAG-level SLA feature, and the new, scalable Task-level SLA feature is implemented, and the existing Task-level SlaMiss feature implementations have been marked for deprecation.


7 Comments

  1. About removing task level SLA and suggestion of using SlaTask operator.

    What if the task works for 2 months, will we get sla for the task within 1 hour (as expected by sla definition)?

    1. That will depend strictly on how you define your dag and your tasks and your SlaTask operator. If your SlaTask operator is set as a Deferrable Operator that checks the status of the task it is watching in an hour, it will always evaluate in an hour.


      To my understanding, the problem with the existing task-level Sla mechanism is that this will not evaluate until the task succeeds because of this SQL filter on the dag_processing/processor.

      .filter(or_(TI.state == State.SUCCESS, TI.state == State.SKIPPED))

  2. > Part 2: Second proposal is that we mark the existing Task-level SLA misfeature for deprecation, and target the deprecation for Airflow 3.0.

    What does it cost us to carry that feature for that long? I'm not sure how close if ever 3.0 is to being released. I wonder if we should develop deprecation strategies for minor releases.

  3. Unrelated to above, but someone brought up the concern regarding the possible confusion between DAG.on_sla_miss_callback and the deprecated DAG.sla_miss_callback due to their very similar names.


    Do we maybe want to introduce the following terminologies in lieu of sla and sla_miss_callback?

    sla → expected_runtime

    sla_miss_callback → long_run_callback


    1. I'd suggest against introducing new terminology. It is already hard to get familiar with all the terms and users have come to identify "sla miss", so we should continue with that. 

  4. Maybe throwing the cat among the pigeons, I am wondering if SLA mechanism should work on Assets instead and not on a DAG Level. And yes I am quite aware we don't have a true notion of Assets currently in Airflow.

    1. In my opinion, no. I think honing in too much on Assets will take away from Airflow's ability to be a very good general purpose orchestrator rather than being just an exceptional ETL orchestrator.