Status
Motivation
For long-running tasks (e.g. jobs on cloud providers), operators and sensors often poll for task status and/or task outputs to determine the success or failure of a task. These task monitoring processes are often blocking operations that can incur various problems, including:
blocking wait operations that needlessly occupy a worker
limited concurrency on local executor
wasted resources on distributed executors
db-sync operations for rescheduling
passing XCom task-ID data
To enable the use of various non-blocking async options for hooks, sensors and operators, an async ecosystem is required and especially an async event loop (executor), task scheduler, and associated asyncio libraries for db-connections etc. Along with that, various ways to enhance existing blocking code with async options is required.
One possibility to explore is to first add an option for an AsyncExecutor that can be used like the LocalExecutor. The goal of an initial POC is to enable a Sensor and/or an Operator to use async methods for blocking operations. For example, when a blocking process is polling for status information from a remote service (cloud operator), the process might invoke a `time.sleep` call between polling periods. For this AIP to work, any of those `time.sleep` calls should have an option to be replaced with an `asyncio.sleep` call; maybe something like:
async def delay(pause, use_async=None): if use_async is None: use_async = os.environ.get('AIRFLOW_USE_ASYNC', False) if use_async: await asyncio.sleep(pause) else: # blocking function in async function, probably better design patterns than this time.sleep(pause)
Considerations
Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc?
What change do you propose to make?
- TBD - AIP WIP
What problem does it solve?
- TBD - AIP WIP
Why is it needed?
- TBD - AIP WIP
Are there any downsides to this change?
- TBD - AIP WIP
Which users are affected by the change?
- TBD - AIP WIP
How are users affected by the change? (e.g. DB upgrade required?)
- TBD - AIP WIP
Other considerations?
- TBD - AIP WIP
What defines this AIP as "done"?
- TBD - AIP WIP