Status
Page properties | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
|
Motivation
This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achite achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API (separate AIP).
DagProcessor is currently part of Scheduler. It works with user code, it is considered "untrusted", while SchedulerJob is "trusted". With changes below it will be possible to start DagProcessor and SchedulerJob as independent processes running on different hosts.
Proposal
Move find_zombies to Scheduler Job
Finding zombies is currently run in the DAG file processing loop, however it feels to have more in common with Scheduler Job than processing files.
I propose to move the code of _find_zombies method to SchedulerJob
class and run it there on a configured interval (next to adopt_or_reset_orphaned_tasks
or emit_metrics
).
The interval is a new configuration option [scheduler]zombie_detection_interval
with default to 30 seconds - currently it's 10 seconds, which sounds too often.
Introduce new configuration option
I propose to add a new configuration option [scheduler]standalone_dag_processor
defaulted to False.
When this option is "True", the SchedulerJob doesn't start the DagFileProcessortAgent
on startup. It's up to the user to run it in an independent process.
Introduce new Airflow CLI command
DAG processor can be executed in standalone mode with airflow dag-processor
command, accepting following inputs:
- --subdir/-s - directory from which to parse dags, it is subfolder of $AIRFLOW_DAG_FOLDER is present
- --num-runs/-n - Number of runs to execute before exiting. 0 for infinity
- --parsing-processes/-P - Number of processes to be used by the Dag Processor - overrides the [scheduler]parsing_processes option.
- --dag-id-regex/-R - when set, only DAGs with dag_id that match the regex are accepted. Se
On startup it executes the DagFileProcessorAgent
processDagFileProcessorManager
process.
Move callbacks queue to database
Callbacks allow you to execute additional functions on dag/task success or failure. Currently they are triggered from both Scheduler Job (e.g. DagRun timeout) and Dag Processing (Zombie detection).
Callbacks are executed by Dag Processor, which makes sense as the code is defined in dag file (untrusted). However, to allow Scheduler Job to trigger callback, in case DagProcessor is running as a standalone process, the information about callback must be passed to DagProcesor.
I proposed to introduce a new Table CallbackRequest for storing the callbacks, with following fields:
callback_id (int)
- uniquely identifies the callbackcreated_at (datetime)
- Time when the callback was addedcallbackpriority_priority weight (int)
- higher values = higher prioritydag_loc (string)
- Path to file that contains the callback codemessage(string)
- additional Message that can be used for loggingcallback_type (enum)
- TaskCallback, DagCallback or SlaCallbackcallback_data (JSON)
- serialized metadata of the callbackdagprocessor_directory subdir (string)
- contains the information about the dag directory(required for "multiple dags directory" described below
Callback_data allow to construct the callback object: TaskCallbackRequest
, DagCallbackRequest
, SlaCallbackRequest
.
Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database.
Dag Processor during the execution loop fetches max_callback_per_loop=XX (higher callback_priority first), deletes them from the database, builds proper CallbackRequests objects and adds them to the "_callback_to_execute" list and processes them the same way as it does now. Each Dag Processor only executes callbacks from a directory it reads DAGs from(based on the dag_directory column).
Airflow still uses the queue in [scheduler]standalone_dag_processor=False
mode.
Code for handling the internal queue of callbacks (currently in in` DagFileProcessorManager._callback_to_executeexecute`) is moved to airflow/callbacks/ db_handler.pywith DatabaseCallbacksHandler
class having following methods
add_callback
Add a new callback to the internal queue or database.
fetch_callback
with BaseCallbackSink
and its two implementations PipeCallbackSin
k (sends callbacks to DagProcessor
using current existing Pipe) and DatabaseCallbackSink
(adds callbacks to Database).
DagProcessor in its loop calls additional method fetch_callbacks
(in [scheduler]
Fetches callbacks from the database and adds them to the internal queue . No-op in standalone_dag_processor=
False mode
pop_callback
Removes the callback from True
mode) which fetches callbacks from database and adds them to the internal queue and returns it.
BaseExecutor extension
To make the callbacks handling more extendable and future-proof, new callbacks are sent for execution (both legacy and standalone dag processor mode) using a new method on BaseExecutor: send_callback_to_execute.This method is implemented in BaseExecutor using DatabaseCallbacksHandler (simply calling add_callback) making it the default approach. by calling BaseCallbackSink
, which is using, according to [scheduler]standalone_dag_processor=True
mode, either PipeCallbackSink
or DatabaseCallbackSink
.
This allows to override this behavior in sub-Executors.
Alternative approach: Running callbacks in workers
Callbacks could also be executed by workers, as they may be considered as part of "task execution" and it feels natural to execute task-related callbacks there, however there are some problems with this, especially when using KubernetesExecutor :
- Performance - creating new Worker only to execute a callback (which is usually small and fast) may be too slow
- Resources - no need to spin new Pod while DagProcessor is able to handle the callback
- Simplicity - the code for handling callback is already in DagProcessor - smaller changes requires
- Also DagFileProcessor is a new entity which has not been considered before, so it might feel unnatural to tie callback execution with it. However the processor already executes user code so from the security point of view the the same security “perimeter” needs to be maintained for both anyway and performance implications are in favor of DagFileProcessor
Add support for multiple dag directories
This change allows multiple dag directories. Each Dag Processor component parsed files from different directories, passed with --subdir/-S parameter. It allows e.g. multiple teams to share the same Airflow installation, putting dags in different directories with different sets of permissions.
To make it possible I propose to extend SerializedDagModel and DagModel tables with new column:
- dag_directory (string)
It contains the information about the dag directory f. During DAG file parsing, the processor sets this field value to the value of --subdir
When removing deleted dags, only dags with dag_directory equal to dag processor's --subdir value are removed/marked as inactive.
Cleanup
Additional cleanup of DagModel/SerializedDagModels is required as DagProcessor may be stopped or restarted with different settings, leaving the abandoned Dags in the database. They are cleaned by SchedulerJob's configured interval(running every 60 seconds) which marks as inactive all DAGs that were not updated by DagProcessor within the last 1 hour.
dag-id-regexDagPolicy
There is also a problem with two different dag files with the same dag_id in different directories. As a solution I proposed two approaches based on user config:
- Default - no config from user
First-come-first-served - throw parsing error if there is already DAG with this id (with different directory) asking the DAG owner to change it
- Introduce additional parameter for DagProcessor airflow dag-processor command --dag-id-regex
propose to use Airflow DagPolicy for this purpose - users will be able to define a policy that checks if Dag ID meets the conditions, eg.
Code Block |
---|
def dag_policy(dag: DAG):
subdir = FileProcessorManager.get_current_subdir()
if not dag.dag_id.startswith(subdir):
raise AirflowClusterPolicyViolation(
f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
) |
Where FileProcessorManager.get_current_subdir
is a new method returning current subdir parameter of Dag-Processor.
Deployment options
Below you can see the possible deployment options of Airflow installation after these changes are implemented.
- [scheduler]standalone_dag_processor=False (default)
The Dag Processor still runs as part of Scheduler. No changes for the end-user.
draw.io Diagram | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- [scheduler]standalone_dag_processor=True. Single Dag Processor
Dag Processor runs as a standalone component. Users need to start a scheduler job (airflow scheduler) and Dag Processor (airflow dag-processor) independently.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- [scheduler]standalone_dag_processor=True. Multiple Dag Processors
Multiple Dag Processors - each parsing dags from a different directory. Users need to start a scheduler job (airflow scheduler) and each Dag Processor (airflow dag-processor --subdir /dag-X). This allows to set up different subdirectories for dags for different teams which are parsed independently and in isolation.
draw.io Diagram border true diagramName deployment-3 simpleViewer false links auto tbstyle top lbox true diagramWidth 691 revision 1
What problem does it solve?
Allows to run multiple Dag Processors as independent components on separate hosts parsing DAGs from distinct locations.
Why is it needed?
This is needed to run DagProcessor in "DBIsolation" mode (separate AIP), without direct access to the database while Scheduler Job still has this access.
Which users are affected by the change?
Only those that set [scheduler]standalone_dag_processor=True
How are users affected by the change? (e.g. DB upgrade required?)
DB upgraded required to use the new mode.
What defines this AIP as "done"?
DagProcessor and SchedulerJob are executed independently on different hosts. it is possible to run multiple DagProcessors parsing DAGs from different directories.
Status
Page properties | |||||||||
---|---|---|---|---|---|---|---|---|---|
|
Motivation
This is part of AIP-1, which aims to run Airflow in multi-tenant way. The way to achieve that is splitting the Airflow components into "trusted" and "untrusted" , which allows to put security boundaries between them. "Untrusted" components could then executed in DBIsolation mode, which disables direct Database access, making it possible only through Airflow Database API (separate AIP).
DagProcessor is currently part of Scheduler. It works with user code, it is considered "untrusted", while SchedulerJob is "trusted". With changes below it will be possible to start DagProcessor and SchedulerJob as independent processes running on different hosts.
Proposal
Move find_zombies to Scheduler Job
Finding zombies is currently run in the DAG file processing loop, however it feels to have more in common with Scheduler Job than processing files.
I propose to move the code of _find_zombies method to SchedulerJob
class and run it there on a configured interval (next to adopt_or_reset_orphaned_tasks
or emit_metrics
).
The interval is a new configuration option [scheduler]zombie_detection_interval
with default to 30 seconds - currently it's 10 seconds, which sounds too often.
Introduce new configuration option
I propose to add a new configuration option [scheduler]standalone_dag_processor
defaulted to False.
When this option is "True", the SchedulerJob doesn't start the DagFileProcessortAgent
on startup. It's up to the user to run it in an independent process.
Introduce new Airflow CLI command
DAG processor can be executed in standalone mode with airflow dag-processor
command, accepting following inputs:
- --subdir/-s - directory from which to parse dags, it is subfolder of $AIRFLOW_DAG_FOLDER is present
- --num-runs/-n - Number of runs to execute before exiting. 0 for infinity
- --parsing-processes/-P - Number of processes to be used by the Dag Processor - overrides the [scheduler]parsing_processes option.
On startup it executes the DagFileProcessorManager
process.
Move callbacks queue to database
Callbacks allow you to execute additional functions on dag/task success or failure. Currently they are triggered from both Scheduler Job (e.g. DagRun timeout) and Dag Processing (Zombie detection).
Callbacks are executed by Dag Processor, which makes sense as the code is defined in dag file (untrusted). However, to allow Scheduler Job to trigger callback, in case DagProcessor is running as a standalone process, the information about callback must be passed to DagProcesor.
I proposed to introduce a new Table CallbackRequest for storing the callbacks, with following fields:
id (int)
- uniquely identifies the callbackcreated_at (datetime)
- Time when the callback was addedpriority_weight (int)
- higher values = higher prioritymessage(string)
- additional Message that can be used for loggingcallback_type (enum)
- TaskCallback, DagCallback or SlaCallbackcallback_data (JSON)
- serialized metadata of the callbackdag_directory (string)
- contains the information about the dag directory(required for "multiple dags directory" described below
Callback_data allow to construct the callback object: TaskCallbackRequest
, DagCallbackRequest
, SlaCallbackRequest
.
Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database.
Dag Processor during the execution loop fetches max_callback_per_loop=XX (higher callback_priority first), deletes them from the database, builds proper CallbackRequests objects and adds them to the "_callback_to_execute" list and processes them the same way as it does now. Each Dag Processor only executes callbacks from a directory it reads DAGs from(based on the dag_directory column).
Airflow still uses the queue in [scheduler]standalone_dag_processor=False
mode.
Code for handling the internal queue of callbacks (currently in` DagFileProcessorManager._callback_to_execute`) is moved to airflow/callbacks/ with BaseCallbackSink
and its two implementations PipeCallbackSin
k (sends callbacks to DagProcessor
using current existing Pipe) and DatabaseCallbackSink
(adds callbacks to Database).
DagProcessor in its loop calls additional method fetch_callbacks
(in [scheduler]standalone_dag_processor=True
mode) which fetches callbacks from database and adds them to the internal queue.
BaseExecutor extension
To make the callbacks handling more extendable and future-proof, new callbacks are sent for execution (both legacy and standalone dag processor mode) using a new method on BaseExecutor: send_callback_to_execute.This method is implemented in BaseExecutor by calling BaseCallbackSink
, which is using, according to [scheduler]standalone_dag_processor=True
mode, either PipeCallbackSink
or DatabaseCallbackSink
.
This allows to override this behavior in sub-Executors.
Alternative approach: Running callbacks in workers
Callbacks could also be executed by workers, as they may be considered as part of "task execution" and it feels natural to execute task-related callbacks there, however there are some problems with this, especially when using KubernetesExecutor :
- Performance - creating new Worker only to execute a callback (which is usually small and fast) may be too slow
- Resources - no need to spin new Pod while DagProcessor is able to handle the callback
- Simplicity - the code for handling callback is already in DagProcessor - smaller changes requires
- Also DagFileProcessor is a new entity which has not been considered before, so it might feel unnatural to tie callback execution with it. However the processor already executes user code so from the security point of view the the same security “perimeter” needs to be maintained for both anyway and performance implications are in favor of DagFileProcessor
Add support for multiple dag directories
This change allows multiple dag directories. Each Dag Processor component parsed files from different directories, passed with --subdir/-S parameter. It allows e.g. multiple teams to share the same Airflow installation, putting dags in different directories with different sets of permissions.
To make it possible I propose to extend SerializedDagModel and DagModel tables with new column:
- dag_directory (string)
It contains the information about the dag directory f. During DAG file parsing, the processor sets this field value to the value of --subdir
When removing deleted dags, only dags with dag_directory equal to dag processor's --subdir value are removed/marked as inactive.
Cleanup
Additional cleanup of DagModel/SerializedDagModels is required as DagProcessor may be stopped or restarted with different settings, leaving the abandoned Dags in the database. They are cleaned by SchedulerJob's configured interval(running every 60 seconds) which marks as inactive all DAGs that were not updated by DagProcessor within the last 1 hour.
DagPolicy
There is also a problem with two different dag files with the same dag_id in different directories. As a solution I propose to use Airflow DagPolicy for this purpose - users will be able to define a policy that checks if Dag ID meets the conditions, eg.
Code Block |
---|
def dag_policy(dag: DAG):
subdir = FileProcessorManager.get_current_subdir()
if not dag.dag_id.startswith(subdir):
raise AirflowClusterPolicyViolation(
f"DAG's id {dag.dag_id} from file path: {dag.fileloc} does not start with '{subdir}.' even if comes from {subdir} subdirectory of DAG folder."
) |
Where FileProcessorManager.get_current_subdir
is a new method returning current subdir parameter of Dag-Processor.Accepts only dags which id meets the provided regular expression. E.g --dag-id-regex="^dags1_.*" - accepts only dag ids starting from "dags1". Any dag which doesn't meet that is rejected
Deployment options
Below you can see the possible deployment options of Airflow installation after these changes are implemented.
- [scheduler]standalone_dag_processor=False (default)
The Dag Processor still runs as part of Scheduler. No changes for the end-user.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- [scheduler]standalone_dag_processor=True. Single Dag Processor
Dag Processor runs as a standalone component. Users need to start a scheduler job (airflow scheduler) and Dag Processor (airflow dag-processor) independently.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
- [scheduler]standalone_dag_processor=True. Multiple Dag Processors
Multiple Dag Processors - each parsing dags from a different directory. Users need to start a scheduler job (airflow scheduler) and each Dag Processor (airflow dag-processor --subdir /dag-X). This allows to set up different subdirectories for dags for different teams which are parsed independently and in isolation.
draw.io Diagram border true diagramName deployment-3 simpleViewer false width links auto tbstyle top lbox true diagramWidth 691 revision 1
What problem does it solve?
Allows to run multiple Dag Processors as independent components on separate hosts parsing DAGs from distinct locations.
Why is it needed?
This is needed to run DagProcessor in "DBIsolation" mode (separate AIP), without direct access to the database while Scheduler Job still has this access.
Which users are affected by the change?
Only those that set [scheduler]standalone_dag_processor=True
How are users affected by the change? (e.g. DB upgrade required?)
DB upgraded required to use the new mode.
What defines this AIP as "done"?
DagProcessor and SchedulerJob are executed independently on different hosts. it is possible to run multiple DagProcessors parsing DAGs from different directories.
...