You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »

Status

StateDraft
Discussion Thread
JIRA

https://issues.apache.org/jira/browse/AIRFLOW-6395

Motivation

For long-running tasks (e.g. jobs on cloud providers), operators and sensors often poll for task status and/or task outputs to determine the success or failure of a task.  These task monitoring processes are often blocking operations that can incur various problems, including:

  • blocking wait operations that needlessly occupy a worker

    • limited concurrency on local executor

    • wasted resources on distributed executors

  • db-sync operations for rescheduling

  • passing XCom task-ID data

To enable the use of various non-blocking async options for hooks, sensors and operators, an async ecosystem is required and especially an async event loop (executor), task scheduler, and associated asyncio libraries for db-connections etc.  Along with that, various ways to enhance existing blocking code with async options is required.

One possibility to explore is to first add an option for an AsyncExecutor that can be used like the LocalExecutor.  The goal of an initial POC is to enable a Sensor and/or an Operator to use async methods for blocking operations.  For example, when a blocking process is polling for status information from a remote service (cloud operator), the process might invoke a `time.sleep` call between polling periods.  For this AIP to work, any of those `time.sleep` calls should have an option to be replaced with an `asyncio.sleep` call; maybe something like:

async def delay(pause, use_async=None):
    if use_async is None:
        use_async = os.environ.get('AIRFLOW_USE_ASYNC', False)
    if use_async:
        await asyncio.sleep(pause)
    else:
        # blocking function in async function, probably better design patterns than this
        time.sleep(pause)

Considerations

Is there anything special to consider about this AIP? Downsides? Difficultly in implementation or rollout etc? 

What change do you propose to make?

  • TBD - AIP WIP


What problem does it solve?

  • TBD - AIP WIP

Why is it needed?

  • TBD - AIP WIP


Are there any downsides to this change?

  • TBD - AIP WIP


Which users are affected by the change?

  • TBD - AIP WIP


How are users affected by the change? (e.g. DB upgrade required?)

  • TBD - AIP WIP


Other considerations?

  • TBD - AIP WIP


What defines this AIP as "done"?

  • TBD - AIP WIP


  • No labels