You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »


Status

StateDraft
Discussion Thread[Discuss] Airflow sensor optimization
JIRA

Unable to render Jira issues macro, execution error.

Created

$action.dateFormatter.formatGivenString("yyyy-MM-dd", $content.getCreationDate())

-- Motivation

Sensors are a special kind of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

Airflow sensors are inefficient. The reason is that in the current design, we spawn a separate worker process for each sensor. In the case where there are a large number of sensor tasks that need to run within certain time limits, we have to allocate a lot of resources to have enough workers for the sensor tasks.

In Airbnb existing sensor tasks account for 70% of concurrent airflow tasks and the number keeps growing. Usually they are long running so we have to use ~140 boxes to run these tasks at peak hours. However sensor tasks are light weight tasks that poke every 3 minutes and idle most of the time. Since we are having more and more sensors, the increased number of tasks also increased DB load by generating heartbeats.

-- What is Smart Sensor

The Smart Sensor is a newly implemented airflow operator which consolidates and dedup multiple(usually hundreds) sensor tasks into one unified task.

Design: After the worker picked up a sensor task, it persisted all information required to poke to the airflow DB sensor_instance table and then exit. The worker slot will be free up after a short period. The smart sensor operator will take over the sensor poking job for multiple(hundreds of) sensors. The workflow is shown in the following diagram.

With smart sensor enabled. Large numbers of sensor tasks that are smart sensor compatible will be consolidated into a few smart sensor operators. 


Details of Issues that need to be addressed and the analysis.

Schema change

Solution: Add sensor_instance table 

  • The load query from the smart sensor operator will be working on the sensor table so that it reduces the risk of locking the task_instance table.
  • Give some isolation for sensor (Based on task_instance state changing strategy it is hard to have full isolation unless we have a well defined API )
  • Duplicate information with task_instance: 
    • It will need all the ti keys (dag_id, task_id, execution_date, try_number) if we propose to create a log entry for each sensor. Compared with doing a join table query each time I would rather duplicate these columns.
  • Consistency and sync issue:
    • Keep most of the behavior of the task_instance table. A new state "sensing" should be  added. A ti will get into either “sensing” or “running” state after being picked up by a worker based on if it can use a smart sensor.
    • Mirror state in sensor table from task_instance table when a record created (initial state in “sensing”). Smart sensor service should only poke for 'sensing' state in sensor table
    • Smart sensor service only visits the task_instance table when a sensor is considered in a final state “success”/”failed”. Smart sensor checks the current state of ti in the task_instance table and only writes back when the ti state is 'sensing'. If the ti state was in another state, it should mirror the state in the sensor table without changing it. 

Persist poke information to DB

The purpose is to avoid parsing all big DAG files from the smart sensor operator. Most sensors have simple and limited arguments, if we can retrieve these from DB without parsing DAG at runtime, one process should be easily handle hundreds of poking jobs.

Solution:

  • Whitelist the fields that need to be persist. Persisting all attributes for task instance objects is very difficult and contains redundant fields. For those sensors that want to be supported by smart sensor. We use poke_context_fields in the sensor class to identify fields that this sensor will need for poking
  • For handling execution settings such as timeout, execution_timeout and email on failure of tasks, we add execution_context_fields to base sensor operator. 

Shards and Deduplicate

To handle a large number of sensor jobs, multiple smart sensor tasks need to be spinned up. The sharding logic in smart sensor is in the following diagram.

We add columns in sensor_instance:

  1. hashcode = hash(poke_context) 
  2. Shardcode = hashcode % shard_code_upper_limit

Each smart sensor task set shard_min and shard_max when it was created. The task will handle all sensors with their shardcode satisfying shard_min <= shardcode < shard_max. 

  • Sensors with same poke_context are considered duplicated sensors in smart sensor
  • In our native DAG implementation, we can guarantee:
    • The smart_sensor_group DAGs will cover all sensor tasks in the cluster -- each sensor task, if it is smart_sensor_compatible, will be picked up by one DAG in smart_sensor_group
    • Duplicated sensors will be loaded by the same smart sensor task.

Implementation:

  • DB schema and new ORM class
    • Add sensor_instance table 
    • Add corresponding class SensorInstance to register sensor tasks and persist poke arguments in DB
  • Add ‘sensing’ state to airflow task instance states. Update UI for the new state as well.
  • Add is_smart_sensor_compatible() to BaseOperator with default value false. 
  • BaseSensorOperator
    • Overwrite is_smart_sensor_compatible()
    • Add register_in_sensor_service()
    • Add function to get the poke context and execution context for sensor jobs
  • In taskinstance.py, 
    • add checking after pre_execute for smart sensor compatibility. 
    • Register qualified sensor jobs(which will set the task instance state to ‘sensing’ state) and exit running tasks without poking it. 
    • Change email_alert to make it callable from both traditional task instance and smart sensor operator
  • New Operator: SmartSensorOpeartor keeps querying DB from sensor jobs in ‘sensing’ state and poke them one by one. 
  • Smart sensor task control:
    • Use native DAGs with a smart sensor operator to spin up and rotate the smart sensor jobs for the airflow cluster. 
    • Turn smart sensor mode on/off by configuring use_smart_sensor configuration
    • Provide flexible smart sensor support by configurable “shards” and “sensors_enabled”
  • Simulate task instance logs that executed by smart sensor

Considerations

  1. Approaches need to update the scheduler behavior and Airflow DB schema.
  2. Handle timeout: Save the timeout and execution_timeout in exec_dict column.
    1. When a timeout was detected, set the single sensor task to failed or up_for_retry and expect scheduler set it to smart_pending as retry
    2. Calculate the total seconds of final failed duration and keep the task in smart sensor state until it failed/success.  (min(timeout, execution_timeout) * (retries + 1))
  3. Smart sensor DAG handle. Should it be manually or in source code.
  4. Special logic for smart sensor health check.
  • No labels