Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

Page properties


StateDraft
Discussion Thread[Discuss] Airflow sensor optimization
JIRA

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyAIRFLOW-3964

Created

Created



Motivation

Thanks Fokko for sharing the reschedule sensor PR by Seelmann( https://github.com/apache/airflow/pull/3596/files ). It did a great job.

I reopen this AIP after viewing the sensor rescheduling PR. Since the reschedule mode does not reduce the number of worker processes for sensor. The batch sensor idea can be a supplement for this purpose and should work well with reschedule mode. 

Low efficiency in Airflow Sensor Implementation:


Sensors are a special kind of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

The reason that the sensor tasks are inefficient is because in current design, we sprawn a separate worker process for each partition sensor. This worker might last a long time, until the target partition is available.  In the case where there are many sensor tasks that need to run within certain time limits, we have to allocate a lot of resources to have enough workers for the sensor tasks. 

Idea

We propose two approaches that could address this issues, batch-sensor and smart-sensor.

Batch-sensor

The basic idea of batch-sensor is to batch process sensor tasks to save resources. During running, a batch-sensor will take N partition sensor requests as the input and poke those N partitions periodically. If the batch-sensor finds that the criteria of some sensor task is met, the batch-sensor will update the database about this sensor tasks.

To do this, we need to create a sensor basic class called ‘batchable’ and make all sensors inherit from this basic class. We also need to change the behavior of schedule regarding a batchable sensor tasks. The schedule will find as many as possible batchable sensor tasks and run those tasks in a batch.

Smart-sensor

Smart-sensor is an improvement on top of batch-sensor.

The idea of smart-sensor is that the worker of smart-sensor will run like a service and it periodically queries task-instance table to find sensor tasks; poke the metastore and update the task instance table if it detects that certain partition or file created.

Design

There are several issues need to be resolved for our smart sensor. 

Persist sensor infor in DB and skip file parsing before worker process running

Current Airflow implementation need to parse the DAG python file before running a task. Parsing multiple python file in a smart sensor make the case low efficiency and overload. Since sensor tasks need relatively more “light weight” executing information -- less number of properties with simple structure (most are built in type instead of function or object). We propose to skip the parsing for smart sensor. The easiest way is to persist all task instance information in airflow metaDB. 

Solution:

It will be hard to dump the whole task instance object dictionary. And we do not really need that much information. 

Whitelist those attribute that we want to have for smart sensor and dump only these values to the DB. Initially we define two sets to the base sensor class as “persist_fields” and “exec_fields”. 

“persist_fields”  dump to airflow.task_instance column “attr_dict”

saves the attribute names that should be used to accomplish a sensor poking job. For example:

  1.  the “NamedHivePartitionSensor” define its persist_fields as  ('partition_names', 'metastore_conn_id', 'hook') since these properties are enough for its poking function. 
  2. While the HivePartitionSensor can be slightly different use persist_fields as ('schema', 'table', 'partition', 'metastore_conn_id')

If we have two tasks that have same property value for all field in persist_fields. That means these two tasks are poking the same item and they are holding duplicate poking jobs in senser. 

The persist_fields can help us in deduplicate sensor tasks. In a more broader way. If we can list persist_fields for all operators, it can help to dedup all airflow tasks.

“Exec_fields” dump to airflow.task_instance column “exec_dict”

Saves the execution configuration such as “poke_interval”, “timeout”, “execution_timeout”

Fields in this set do not contain information affecting the poking job detail. They are related to how frequent should we poke, when should the task timeout, how many times timeout should be a fail etc. We only put those logic that we can easily handle in a smart sensor for now. This is a smart sensor “doable whitelist” and can be extended with more logic being “unlocked” by smart sensor implementation. 

 When we initialize a task instance object. We dump the attribute value of these two sets and persist them in the Airflow metaDB. Smart sensor can visit DB to get all required information of running sensor tasks and don’t need to parse any DAG files.

Airflow scheduler change

We do not want to break any existing logic in scheduler. The smart sensor is a configurable mode and can be easily fallback to scheduler regular logic when it detects the case is not good for smart sensor.

Solution

Scheduler process_file

Right before we set a task instance state to “scheduled”, add smart sensor check to do:

  1. Check if Airflow is configured as use smart sensor
  2. Check if current task is good for smart sensor running

If both check got a “yes” that means the task instance is qualified for smart sensor. Airflow scheduler set its state to “smart_pending” instead of “scheduled” and this task instance will  NOT BE QUEUED to the executor. It is expected to be picked up by a smart sensor task from DB query. Smart sensor will update the state to final state (“success” or “failed”) or “up_for_retry” and it can come return back to normal scheduler world.

If any of the above checks has a “no” answer, either current airflow cluster is not configured to use smart sensor or the task itself is out of smart sensor scope. the scheduler will schedule task instance just like no smart sensor exist.

Include smart sensor DAG by configuration

We are using a smart sensor DAG to kick off all smart sensor tasks. If airflow is configured to use smart sensor. The DAG will be included in the scheduler parsing paths. Implementation similar as example dags.

Smart sensor operator

Smart sensor logic

In each execute loop:

  • refresh_all_dict(): Select all tasks from DB with state “smart_pending” or “smart_running” and shardcode qualified.
  • For all tasks in the task dictionary to poke:
    • If task with same persist_field has been poked in this round 
      • If task poked has a final state, don’t need to do anything
      • If task poked does not have a final state, need to handle timeout
    • Else (not poked in this round)
      • Execute the sensor job
      • For success or failed state, mark states in airflow DB for all tasks that
        • Have same persist_fields hashcode
        • State in (“smart_pending”, “smart_running”)
      • Check and handle timeout

Issue

Smart sensor need to handle the following issues:

  1. Get multiple tasks qualified for smart sensor.
  2. Do the work for all collected sensor tasks
  3. Sensor tasks duplication.
  4. Sharding when there are multiple smart sensor running.

Dedup and shard:

Attr_dict ⇒ hashcode ⇒ shardcode

Hashcode = hash(attr_dict)

Shardcode = Hashcode % (max_shard + 1)

The range of shardcode, number of smart sensor tasks can be configured in airflow.cfg

Each smart sensor task has a shardcode range and only query tasks whose shardcode in this range. Duplicate sensor task will have the same hash code and same shardcode so they are going to be handled by the same smart sensor task.

This is the general logic for base_smart_operator. Each sensor operator can have its own smart sensor operator and inherit above logic. The difference between smart sensor operators come from "persist_fields" and "poke" function detail. 

Schema change:

Task_instance table: (add 4 columns and 2 indexes)

op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), nullable=True))

op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), nullable=True))

op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), nullable=True))

op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), nullable=True))

op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False)

op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False)

Remaining Issue

Considerations

  1. Approaches need to update the scheduler behavior and Airflow DB schema.
  2. Handle timeout: Save the timeout and execution_timeout in exec_dict column.
    1. When a timeout was detected, set the single sensor task to failed or up_for_retry and expect scheduler set it to smart_pending as retry
    2. Calculate the total seconds of final failed duration and keep the task in smart sensor state until it failed/success.  (min(timeout, execution_timeout) * (retries + 1))
  3. Smart sensor DAG handle. Should it be manually or in source code.
  4. Special logic for smart sensor health check.

Considerations

Both approaches need to update the scheduler behavior and Airflow DB schema
  1. .