You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

StateDraft
Discussion Thread
JIRA

https://issues.apache.org/jira/browse/AIRFLOW-6395

Motivation

For long-running tasks (e.g. jobs on cloud providers), operators and sensors often poll for task status and/or task outputs to determine the success or failure of a task.  These task monitoring processes are often blocking operations that can incur various problems, including:

  • blocking wait operations that needlessly occupy a worker

    • limited concurrency on local executor

    • wasted resources on distributed executors

  • db-sync operations for rescheduling

  • passing XCom task-ID data

To enable the use of various non-blocking async options for hooks, sensors and operators, an async ecosystem is required and especially an async event loop (executor), task scheduler, and associated asyncio libraries for db-connections etc.  Along with that, various ways to enhance existing blocking code with async options is required.

One possibility to explore is to first add an option for an AsyncExecutor that can be used like the LocalExecutor.  The goal of an initial POC is to enable a Sensor and/or an Operator to use async methods for blocking operations.  For example, when a blocking process is polling for status information from a remote service (cloud operator), the process might invoke a `time.sleep` call between polling periods.  For this AIP to work, any of those `time.sleep` calls should have an option to be replaced with an `asyncio.sleep` call; maybe something like:

async def delay(pause, use_async=None):
    if use_async is None:
        use_async = os.environ.get('AIRFLOW_USE_ASYNC', False)
    if use_async:
        await asyncio.sleep(pause)
    else:
        # blocking function in async function, probably better design patterns than this
        time.sleep(pause)

Considerations

Is there anything special to consider about this AIP? Downsides? Difficulty in implementation or rollout etc? 

What change do you propose to make?

For higher concurrency in some areas of interaction with cloud providers, e.g. AWS services, using asyncio patterns might improve performance, esp. for blocking operations like polling for task status on external systems.

  • Consider https://github.com/aio-libs/aiobotocore as an alternative or supplement to botocore/boto3
    • be wary of compatibility and maintenance issues between the async version and the regular versions
    • be wary of API throttle limits (use async semaphores to limit concurrency?)

Consider options for async event loops, in addition to the asyncio module, e.g.

What problem does it solve?

  • High concurrency with coroutines
    • optimal use of single cores
    • advantages of coroutines:
      • in-memory state for hook/sensor/operator (less db-overheads?)
      • explicit breakpoints for blocking operations

Why is it needed?

  • An async ecosystem
    • async event loop
    • async compatible libraries

Are there any downsides to this change?

  • Creating an easy solution for backward compatible systems
    • Easily running blocking code in an AsyncExecutor
      • adding warnings when async is an option?
    • Refactoring base classes to enable async options?
      • ensuring async is not the default behavior?

Which users are affected by the change?

  • Mostly backend execution code and configuration
  • Need to consider impacts on UI status indicators
    • may need async compatible db drivers

How are users affected by the change? (e.g. DB upgrade required?)

  • Added configuration options
  • Added documentation for an AsyncExecutor
  • Compatibility with synchronous code
    • A pathway to enable async systems

Other considerations?

  • Debugging
  • CI test suites with/without async

What defines this AIP as "done"?

  • TBD - AIP WIP


  • No labels