Thanks Fokko for sharing the reschedule sensor PR by Seelmann( 
Discussion Thread[Discuss] Airflow sensor optimization

3596/files ). It did a great job.


In Release2.0.0

-- Motivation

In our experience with airflow, we found that the resource utilization is very low. We need a lot of worker machines to support all the concurrent running tasks. Yet we found that many machines are running the maximum number of tasks however the machines are almost idle. Those machines are running sensor tasks. We also found that there are other operators with similar behavior, such as SubDag Operator and the airflow spark submit client.

Why can these tasks cause such problems?

I reopen this AIP after viewing the sensor rescheduling PR. Since the reschedule mode does not reduce the number of worker processes for sensor. The batch sensor idea can be a supplement for this purpose and should work well with reschedule mode. 

Low efficiency in Airflow Sensor Implementation:

Sensors are a special kind of operator that will keep running until a certain criterion is met. Examples include a specific The criterion can be a file landing in HDFS hdfs or S3s3, a partition appearing in Hivehive, some external task succeeded or even it is a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

The reason that the sensor tasks are inefficient is because in current design, we sprawn a separate worker process for each partition sensor. This worker might last a long time, until the target partition is available.  In the case where there are many sensor tasks that need to run within certain time limits, we have to allocate a lot of resources to have enough workers for the sensor tasks. 


We propose two approaches that could address this issues, batch-sensor and smart-sensor.


The basic idea of batch-sensor is to batch process sensor tasks to save resources. During running, a batch-sensor will take N partition sensor requests as the input and poke those N partitions periodically. If the batch-sensor finds that the criteria of some sensor task is met, the batch-sensor will update the database about this sensor tasks.

To do this, we need to create a sensor basic class called ‘batchable’ and make all sensors inherit from this basic class. We also need to change the behavior of schedule regarding a batchable sensor tasks. The schedule will find as many as possible batchable sensor tasks and run those tasks in a batch.


Smart-sensor is an improvement on top of batch-sensor.

The idea of smart-sensor is that the worker of smart-sensor will run like a service and it periodically queries task-instance table to find sensor tasks; poke the metastore and update the task instance table if it detects that certain partition or file created.


There are several issues need to be resolved for our smart sensor. 

Persist sensor infor in DB and skip file parsing before worker process running

Current Airflow implementation need to parse the DAG python file before running a task. Parsing multiple python file in a smart sensor make the case low efficiency and overload. Since sensor tasks need relatively more “light weight” executing information -- less number of properties with simple structure (most are built in type instead of function or object). We propose to skip the parsing for smart sensor. The easiest way is to persist all task instance information in airflow metaDB. 


It will be hard to dump the whole task instance object dictionary. And we do not really need that much information. 

Whitelist those attribute that we want to have for smart sensor and dump only these values to the DB. Initially we define two sets to the base sensor class as “persist_fields” and “exec_fields”. 

“persist_fields”  dump to airflow.task_instance column “attr_dict”

saves the attribute names that should be used to accomplish a sensor poking job. For example:

  1.  the “NamedHivePartitionSensor” define its persist_fields as  ('partition_names', 'metastore_conn_id', 'hook') since these properties are enough for its poking function. 
  2. While the HivePartitionSensor can be slightly different use persist_fields as ('schema', 'table', 'partition', 'metastore_conn_id')

If we have two tasks that have same property value for all field in persist_fields. That means these two tasks are poking the same item and they are holding duplicate poking jobs in senser. 

The persist_fields can help us in deduplicate sensor tasks. In a more broader way. If we can list persist_fields for all operators, it can help to dedup all airflow tasks.

“Exec_fields” dump to airflow.task_instance column “exec_dict”

Saves the execution configuration such as “poke_interval”, “timeout”, “execution_timeout”

Fields in this set do not contain information affecting the poking job detail. They are related to how frequent should we poke, when should the task timeout, how many times timeout should be a fail etc. We only put those logic that we can easily handle in a smart sensor for now. This is a smart sensor “doable whitelist” and can be extended with more logic being “unlocked” by smart sensor implementation. 

 When we initialize a task instance object. We dump the attribute value of these two sets and persist them in the Airflow metaDB. Smart sensor can visit DB to get all required information of running sensor tasks and don’t need to parse any DAG files.

Airflow scheduler change

We do not want to break any existing logic in scheduler. The smart sensor is a configurable mode and can be easily fallback to scheduler regular logic when it detects the case is not good for smart sensor.


Scheduler process_file

Right before we set a task instance state to “scheduled”, add smart sensor check to do:

  1. Check if Airflow is configured as use smart sensor
  2. Check if current task is good for smart sensor running

If both check got a “yes” that means the task instance is qualified for smart sensor. Airflow scheduler set its state to “smart_pending” instead of “scheduled” and this task instance will  NOT BE QUEUED to the executor. It is expected to be picked up by a smart sensor task from DB query. Smart sensor will update the state to final state (“success” or “failed”) and it can return back to normal scheduler world.

If any of the above checks has a “no” answer, either current airflow cluster is not configured to use smart sensor or the task itself is out of smart sensor scope. the scheduler will schedule task instance just like no smart sensor exist.

Include smart sensor DAG by configuration

We are using a smart sensor DAG to kick off all smart sensor tasks. If airflow is configured to use smart sensor. The DAG will be included in the scheduler parsing paths. Implementation similar as example dags.

Smart sensor operator

Smart sensor logic

In each execute loop:

  • refresh_all_dict(): Select all tasks from DB with state “smart_pending” or “smart_running” and shardcode qualified.
  • For all tasks in the task dictionary to poke:
    • If task with same persist_field has been poked in this round 
      • If task poked has a final state, don’t need to do anything
      • If task poked does not have a final state, need to handle timeout
    • Else (not poked in this round)
      • Execute the sensor job
      • For success or failed state, mark states in airflow DB for all tasks that
        • Have same persist_fields hashcode
        • State in (“smart_pending”, “smart_running”)
      • Check and handle timeout


Smart sensor need to handle the following issues:

  1. Get multiple tasks qualified for smart sensor.
  2. Do the work for all collected sensor tasks
  3. Sensor tasks duplication.
  4. Sharding when there are multiple smart sensor running.

Dedup and shard:

Attr_dict ⇒ hashcode ⇒ shardcode

Hashcode = hash(attr_dict)

Shardcode = Hashcode % (max_shard + 1)

The range of shardcode, number of smart sensor tasks can be configured in airflow.cfg

Each smart sensor task has a shardcode range and only query tasks whose shardcode in this range. Duplicate sensor task will have the same hash code and same shardcode so they are going to be handled by the same smart sensor task.

This is the general logic for base_smart_operator. Each sensor operator can have its own smart sensor operator and inherit above logic. The difference between smart sensor operators come from "persist_fields" and "poke" function detail. 

Schema change:

Task_instance table: (add 4 columns and 2 indexes)

op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), nullable=True))

op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), nullable=True))

op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), nullable=True))

op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), nullable=True))

op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)

op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)

.  Executing a sensor task is very straightforward, sensors call the “poke” function for checking the criterion in a time interval, usually every 3 mins,  and succeed if their “poke” functions return true and fail when sensors timeout. The execution of a “poke” is very fast. Most of the time, sensors are waiting for the next “poke” time coming. 

SubDags is used to define the repeating patterns in a DAG and it makes complicated DAGs structure cleaner and more readable. The implementation of the SubDagOperator follows a similar pattern as sensors. It creates the dagrun for a subdag in pre_execute function and keeps “poking” the dagrun status in the execute function. 

There are more other operators such as SparkSubmitOperator. The spark client in airflow submits the job and polls until completion. All these tasks, after some initialization work, fall into a lightweight and may be long running status which is inefficient.

From the previous analysis we can summarize issues for these tasks falling into the same “long running, lightweight” pattern:

  1. The resource utilization is very low. Worker process for these tasks idle for 99% of the time. 
  2. These tasks are often long running and they are taking a big part of concurrent running tasks in a large scale cluster. In airbnb, more than 70% of running tasks are sensors. 
  3. Every task in airflow emits heartbeats. Before migrating heartbeat to redis, the heartbeat took a great part of requests to airflow metaDB.  A large number of long running tasks increase the metaDB load and make the situation worse when sth goes wrong
  4. There are a lot of duplicate sensor tasks ; more than 40%  sensor jobs are duplicate. The reason is that many downstream DAGs usually wait for the same partitions from a few important upstream DAGs.

In Airbnb existing sensor tasks account for 70% of concurrent airflow tasks and the number keeps growing. Usually they are long running so we have to use a large number of worker slots to run these tasks at peak hours. However sensor tasks are light weight tasks that poke every 3 minutes and idle most of the time. Since we are having more and more sensors, the increased number of tasks also increased DB load by generating heartbeats.

-- What is Smart Sensor

The smart sensor is a service which greatly reduces airflow’s infrastructure cost by consolidating some of the airflow long running light weight tasks.


Instead of using one process for each task, our main idea to improve the efficiency of these long running tasks is to use centralized processes to execute those tasks in batches.  

To do that, we need to run a task in two steps, the first step is to serialize the task information into the database; and the second step is to use a few centralized processes to execute the serialized tasks in batches. 

In this way, we only need a handful of running processes. In airbnb, we just need 50-80 running processes, compared to more than 20 thousands before. 

Good news is that the heartbeats decrease as the running process decreases and the database load is reduced correspondingly. 

Another problem with the existing approach is that there are a lot of duplicate tasks. In smart sensor, we also implemented a way to detect the duplicate tasks and then avoid running the duplicate jobs in the same poking cycle. 

Our way to handle duplicate sensors is : first,  we create a hash for each job by the job’s meta data. We use the hash as the signature of the job such that If two jobs have the same hash, they are duplicate jobs.  We then save the job’s  hash in the database along with other serialized info from the job.  The centralized process will only process task with the same hash once in each poking cycle. 

Logic Flow

A task starts with the same logic of rendering the template and runs the pre_execute function for both smart sensor/ non smart sensor modes.  After running the pre_execute function, there is a new branch for smart sensor logic. It checks if itself is smart sensor compatible. A task supported by smart sensors tries to register itself in the smart sensor service. If the registration succeeded, it exited with a “sensing” state and expected the centralized smart sensor tasks to handle the rest of the work. If the registration fails or this task is not supported by the smart sensor, it falls back to run the execute function.

Image Added

With smart sensor enabled. Large numbers of sensor tasks that are smart sensor compatible will be consolidated into a few smart sensor operators. 

Image Added

Details of Issues that need to be addressed and the analysis.


Change summary: 

  • Add a new mode called “smart sensor mode”. In smart sensor mode, instead of holding a long running process for each sensor and poking periodically, a sensor will only store poke context at sensor_instance table and then exits with a ‘sensing’ state. 
  • When the smart sensor mode is enabled, a special set of builtin smart sensor DAGs (named smart_sensor_group_shard_xxx) is created by the system; These DAGs contain SmartSensorOperator task and manage the smart sensor jobs for the airflow cluster. The SmartSensorOperator task can fetch hundreds of ‘sensing’ instances from sensor_instance table and poke on behalf of them in batches. Users don’t need to change their existing DAGs.
  • The smart sensor mode currently supports NamedHivePartitionSensor and MetastorePartitionSensor however it can easily be extended to support more sensor classes. 
  • Smart sensor mode on/off, the list of smart sensor enabled classes, and the number of SmartSensorOperator tasks can be configured in airflow config.
  • Sensor logs in smart sensors are populated to each task instance log UI.

Schema change

CREATE TABLE `sensor_instance` (


`task_id` varchar(250) NOT NULL,

`dag_id` varchar(250) NOT NULL,

`execution_date` timestamp(6) NOT NULL,

`state` varchar(20) DEFAULT NULL,

`try_number` int(11) DEFAULT NULL,

`start_date` timestamp(6) NULL DEFAULT NULL,

`operator` varchar(1000) DEFAULT NULL,

`op_classpath` varchar(1000) DEFAULT NULL,

`hashcode` bigint(20) DEFAULT NULL,

`shardcode` int(11) DEFAULT NULL,

`poke_context` text DEFAULT NULL,

`execution_context` text DEFAULT NULL,




CREATE UNIQUE INDEX ti_primary_key

ON sensor_instance(dag_id, task_id, execution_date);

ALTER TABLE `sensor_instance`

ADD INDEX si_hashcode(hashcode),

ADD INDEX si_shardcode(shardcode),

ADD INDEX si_state_shard(state, shardcode)


ALTER TABLE `sensor_instance`

ADD INDEX si_updated_at(updated_at)


Add sensor_instance table 

  • Smart sensor tasks will query the sensor_instance table instead of parse DAG file to get task information thus avoid taking a long time on parsing multiple DAGs at runtime.  
  • What is in the sensor_instance table
    • The sensor_instance table duplicates most information from task_instance. It will need all the ti keys (dag_id, task_id, execution_date, try_number) if we propose to create a log entry for each sensor. Compared with doing a join table query each time I would rather duplicate these columns.
    • The sensor_instance table has new columns:
      • `hashcode` bigint(20) DEFAULT NULL,

        `shardcode` int(11) DEFAULT NULL,

        `poke_context` text DEFAULT NULL,

        `execution_context` text DEFAULT NULL,

  • Why did we create a new table? 
    • Give some isolation for sensor (Based on task_instance state changing strategy it is hard to have full isolation unless we have a well defined API )
    • Smart sensor tasks will query the sensor_instance table so that it reduces the risk of locking the task_instance table which is accessed by most processes in airflow.
  • Consistency and sync issue:
    • Keep most of the behavior of the task_instance table. A new state "sensing" should be  added. A ti will get into either “sensing” or “running” state after being picked up by a worker based on if it can use a smart sensor.
    • Mirror state in sensor table from task_instance table when a record created (initial state in “sensing”). Smart sensor service should only poke for 'sensing' state in sensor table
    • Smart sensor service only visits the task_instance table when a sensor is considered in a final state “success”/”failed”. Smart sensor checks the current state of ti in the task_instance table and only writes back when the ti state is 'sensing'. If the ti state was in another state, it should mirror the state in the sensor table without changing it. 

Persist poke information to DB

The purpose is to avoid parsing all big DAG files from the smart sensor operator. Most sensors have simple and limited arguments, if we can retrieve these from DB without parsing DAG at runtime, one process should be easily handle hundreds of poking jobs.


  • Whitelist the fields that need to be persist. Persisting all attributes for task instance objects is very difficult and contains redundant fields. For those sensors that want to be supported by smart sensor. We use poke_context_fields in the sensor class to identify fields that this sensor will need for poking
  • For handling execution settings such as timeout, execution_timeout and email on failure of tasks, we add execution_context_fields to base sensor operator. 

Shards and Deduplicate

To handle a large number of sensor jobs, multiple smart sensor tasks need to be spinned up. The sharding logic in smart sensor is in the following diagram.

Image Added

We add columns in sensor_instance:

  1. hashcode = hash(poke_context) 
  2. Shardcode = hashcode % shard_code_upper_limit

Each smart sensor task set shard_min and shard_max when it was created. The task will handle all sensors with their shardcode satisfying shard_min <= shardcode < shard_max. 

  • Sensors with same poke_context are considered duplicated sensors in smart sensor
  • In our native DAG implementation, we can guarantee:
    • The smart_sensor_group DAGs will cover all sensor tasks in the cluster -- each sensor task, if it is smart_sensor_compatible, will be picked up by one DAG in smart_sensor_group
    • Duplicated sensors will be loaded by the same smart sensor task.

Sensor logs

One smart sensor task will execute hundreds of sensors’ work and output logs to one single log file with default airflow log setup. To keep the user’s log access, a smart sensor creates separate log files for each sensor it handles. It configures the log handler to output log file to the same path where a regular sensor log should be.

Support different sensor operators

The sensor classpath was persisted into DB at sensor registration. Which will be used to import the sensor class in smart sensor. The smart sensor then uses this class to create a cached sensor task object whose “poke()” function will be reused.

Smart sensor tasks management

Airflow configuration use_smart_sensor = True will include smart_sensor_group DAGs.

Implementation checklist

  •  DB schema and new ORM class
    •  Add sensor_instance table 
    •  Add corresponding class SensorInstance to register sensor tasks and persist poke arguments in DB
  •  Add ‘sensing’ state to airflow task instance states. Update UI for the new state as well.
  •  BaseOperator: Add is_smart_sensor_compatible() to BaseOperator with default value false. 
  •  BaseSensorOperator
    •  Override is_smart_sensor_compatible()
    •  Add register_in_sensor_service()
    •  Add function to get the poke context and execution context for sensor jobs
  •  In
    •  Add checking after pre_execute for smart sensor compatibility. 
    •  Register qualified sensor jobs(which will set the task instance state to ‘sensing’ state) and exit running tasks without poking it. 
    •  Change email_alert to make it callable from both traditional task instance and smart sensor operator
  •  New Operator: SmartSensorOpeartor keeps querying DB from sensor jobs in ‘sensing’ state and poke them one by one. 
  •  Smart sensor task control
    •  Use native DAGs with a smart sensor operator to spin up and rotate the smart sensor jobs for the airflow cluster. 
    •  Turn smart sensor mode on/off by configuring use_smart_sensor configuration
    •  Provide flexible smart sensor support by configurable “shards” and “sensors_enabled”
  •  Simulate task instance logs that executed by smart sensor

How to enable/disable smart sensor

Enabling/Disabling the smart sensor is a system level config which is transparent to the individual users. An example of smart sensor enabled cluster config is as follows:

use_smart_sensor = true
shard_code_upper_limit = 10000
shards = 5
sensor_enabled = NamedHivePartitionSensor, MetastorePartitionSensor

The "use_smart_sensor" config indicates if the smart sensor is enabled. The "shards" config indicates the number of concurrently running smart sensor jobs for the airflow cluster. The "sensor_enabled" config is a list of sensor class names that will use the smart sensor.  The users use the same class names (e.g. HivePartitionSensor) in their DAGs and they don’t have the control to use smart sensors or not, unless they exclude their tasks explicits.  

  1. Enabling/disabling the smart sensor service is a system level configuration change. It is transparent to the individual users.
  2. Existing DAGs don't need to be changed for enabling/disabling the smart sensor.
  3. Rotating centralized smart sensor tasks will not cause any user’s sensor task failure.

How to support new operators

  1. Define "poke_context_fields" as class attribute in the operator. "poke_context_fields" include all key names used for initializing a sensor object.
  2. In airflow.cfg, add new operator classname to [smart_sensor] sensors_enabled. All supported sensor classname should be comma separated.

-- Impact after deployment

Cost Saving: we are able to terminate about 80% of the ec2 instances in airflow clusters which were designated for sensor tasks, the worker slots used for sensor tasks went down 85%. 

Image Added

System Improvement: The number of parallel sensor tasks is reduced by 85%, and the number of concurrent airflow tasks is reduced more than 60%. The reduction of tasks greatly helps stabilize airflow cluster by reducing the airflow metaDB load, mostly from heartbeat operations (before deployment sensor tasks took more than 70% ratio of all running tasks). 

Image Added

Reducing the request to Hive metastore database because the smart sensor consolidates duplicate sensor tasks. As shown below, the gray dot-line shows the original number of requests to hive metastore database and the blue solid line shows the number of requests after we deployed the smart sensor.

Image Added


  1. To make the smart sensor extensible to all sensor operators, we may need to improve poke context serialize/deserialize function with a more general solution. 
  2. The callbacks were not supported in this first version.
  3.  Smart sensor do syn mode poke for all sensors. One of the future work is to support customized poke_interval


  1. Approaches need to update the scheduler behavior and Airflow DB schema.
  2. Handle timeout: Save the timeout and execution_timeout in exec_dict column.
    1. When a timeout was detected, set the single sensor task to failed or up_for_retry and expect scheduler set it to smart_pending as retry
    2. Calculate the total seconds of final failed duration and keep the task in smart sensor state until it failed/success.  (min(timeout, execution_timeout) * (retries + 1))
  3. Smart sensor DAG handle. Should it be manually or in source code.
  4. Special logic for smart sensor health check.