You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Status

StateCompleted
Discussion Threadplaceholder
JIRA


Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

Motivation

For some tasks it is helpful to be able to persist some task state information to the metastore. 

Below we discuss a few ways to enhance support for these kinds of tasks.


Job patterns benefitting from state persistence

Incremental processes

An incremental process can look like this:

  • at job start, query max modified date (call this "high watermark")
  • after data successfully transferred, store high watermark
  • at start of next run, pull data from last high watermark
  • repeat

Being able to store this information in airflow metastore would be convenient and can help resolve some challenges that are encountered with these kinds of jobs.  It is often suggested to ascertain state by inspecting the target.  Sometimes this is simply not impossible.

Remote async processes

Currently if you have an async process, your task may idly occupy a worker slot when it is simply waiting for a remote async job to complete.  Being able to persist a job id to the metastore would support the creation of sensor-like operators that can check to see if the job is done and once it is, process the job.

Problems with the existing approaches

Currently there are two ways to store state information in the metastore: Variables and XCom.

XCom

XCom can work as a mechanism of state persistence.  But there are just a couple idiosyncrasies that make it problematic.  

Clearing at task start

XCom records are cleared at task start, even after failure or reschedule.  This means they cannot be used for the remote async job pattern.  They also cannot be used for incremental jobs where you want to keep track of progress in a large batched process and resume after a retry in the event of failure.

Execution date

The task instance attribute execution_date  is mildly problematic for incremental jobs because it can be out of order with actual execution time.  This can happen when a user clicks "trigger dag" in the web UI (because it uses an execution date of now, while ordinarily a task will have have an execution date of now-minus-one-schedule-interval). 

For the incremental load pattern, generally speaking the problem will just be that for a couple runs you pull more data than you needed to.   This could be remedied by making it possible to get previous xcom by timestamp instead of execution date.

For the incremental case, if you want to modify current state, because of execution date you have to work a little harder modify the latest record (search for your task and sort by execution date descending).  And since you don't generally care about the historical watermarks, it's a mildly wasteful use of space.

Execution_date ordering not a problem for the remote async job pattern. 

Variables

Variables are a decent solution for incremental jobs, but are a bit messy.  Because there is only one field to identify a variable, it makes it tougher to use and keep organized.  If you already use variables for arbitrary manual job configuration, then adding 1000s of records of automatically-created state-persistence variables would potentially pollute your variable namespace and make it tougher to use.  

Additionally, it does not make sense to use Variable for state persistence with built-in operators because there is no way to prevent namespace conflicts.

Possible solutions

Solution 1: add column "is_stateful" to XCom

Add a boolean column is_stateful  to XCom table.  XCom records with is_stateful=True would not be cleared at task start.   This would allow you to retrieve state information after failures or reschedules.

Solution 2: add column "state_data" to task_instance table

Add string column state_data  to the task_instance  table.

Add TaskInstance methods like the following:

  • get_state (gets current value of state_data )
  • get_previous_state (gets state_data  for previous TI)
  • set_state (set state_data )

Solution 3: add TaskState model

Columns would be something like the following:

  • dag_id (primary key)
  • task_id (primary key)
  • value
  • timestamp

This is perfect for jobs such as incremental process that don't care about execution date.  Keeping only the current state makes it easier to find and manually alter the current job state if desired.  And it fits the incremental pattern well conceptually: there is only one state at any given time.   However, this solution makes less sense for the remote async report use pattern, because in that pattern you might have a different remote async jobs for different runs of a task and you might want them to run concurrently.

Solution 4: add column "namespace" to Variable

It is already possible to use Variable as a store of task state.  The problem is that it is a little messy because it is they are named with just one field.   Simply adding a namespace column would make variables a bit more practical to use for state persistence.

However, while would help enable end users to create stateful operators for their own use, it would not be a practical solution for supporting built-in stateful operators, because we would not really be able to guarantee that variable names we chose would not collide with those found on clusters in the wild.






  • No labels