You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 25 Next »

Status

StateDraft
Discussion Threadapache-airflow.slack.com - sig-async-operators
JIRA

Unable to render Jira issues macro, execution error.

Motivation

For long-running tasks (e.g. jobs on cloud providers), operators and sensors often poll for task status and/or task outputs to determine the success or failure of a task.  These task monitoring processes are often blocking operations that can incur various problems, including:

  • blocking wait operations that needlessly occupy a worker

    • limited concurrency on local executor

    • wasted resources on distributed executors

  • db-sync operations for rescheduling

  • passing XCom task-ID data

To enable the use of various non-blocking async options for hooks, sensors and operators, an async ecosystem is required and especially an async event loop (executor), task scheduler, and associated asyncio libraries for db-connections etc.  Along with that, various ways to enhance existing blocking code with async options is required.

One possibility to explore is to first add an option for an AsyncExecutor that can be used like the LocalExecutor.  The goal of an initial POC is to enable a Sensor and/or an Operator to use async methods for blocking operations.  For example, when a blocking process is polling for status information from a remote service (cloud operator), the process might invoke a `time.sleep` call between polling periods.  For this AIP to work, any of those `time.sleep` calls should have an option to be replaced with an `asyncio.sleep` call; maybe something like:

async def delay(pause, use_async=None):
    if use_async is None:
        use_async = os.environ.get('AIRFLOW_USE_ASYNC', False)
    if use_async:
        await asyncio.sleep(pause)
    else:
        # blocking function in async function, probably better design patterns than this
        time.sleep(pause)

See also:

This AIP is different than the following, but they share similar goals for optimize concurrency and performance:

Considerations

Is there anything special to consider about this AIP? Downsides? Difficulty in implementation or rollout etc? 

What change do you propose to make?

For higher concurrency in some areas of interaction with cloud providers, e.g. AWS services, using asyncio patterns might improve performance, esp. for blocking operations like polling for task status on external systems.  

  • start with a small set of functionality to enable an asyncio event loop that can execute tasks with high concurrency for non-blocking operations
    • it should be a small, isolated way to configure and use an asyncio ecosystem in Airflow
    • it might require a special package-install option to add asyncio-compatible libs
    • it might require an example config file specific to asyncio settings
  • Consider https://github.com/aio-libs/aiobotocore as an alternative or supplement to botocore/boto3
    • be wary of compatibility and maintenance issues between the async version and the regular versions
    • be wary of API throttle limits (use async semaphores to limit concurrency?)

Consider options for async event loops, in addition to the asyncio module, e.g.

  • https://distributed.dask.org/en/latest/asynchronous.html
    • The DaskExecutor has an optional init param to use a Client instance, which could be instantiated with async behavior, but it might require additional async wrapping on it that can await the client. Hence, I need to understand how the Airflow scheduler spins up one or more executors to schedule a DAG run, and whether or not a DAG might have a property that indicates that it wants to use async, so that the scheduler can try to use an async executor.
    • The `DaskExecutor.execute_async` could use a distributed client with an async event loop enabled, rather than async futures that might spawn new processes.  It could manage an event loop for async execution, but it's not clear how it would work with tasks (hooks, sensors, operators) that are not specifically designed for asyncio behavior.

Consider options for async-db drivers, e.g.

What problem does it solve?

  • High concurrency with coroutines
    • optimal use of single cores
    • advantages of coroutines:
      • in-memory state for hook/sensor/operator (less db-overheads?)
      • explicit breakpoints for blocking operations

Why is it needed?

  • An async ecosystem
    • async event loop
    • async compatible libraries
  • async loop(s) obtained or started, as necessary, in various places:
    • airflow/jobs might need a new `AsyncJob`
    • anything that uses `ExecutorLoader.get_default_executor()` where it could return an async executor might need an async loop and whether it gets it from some global scope or from an executor is not entirely clear
      • `airflow.models.dag.DAG.run`
      • `airflow.models.dagbag.DagBag.executor` could be an async executor
      • `airflow.www.views.Airflow.run` could get an async executor
      • `airflow.cli.commands.task_command._run_task_by_executor` likewise
  • Clear documentation and examples of how to configure Airflow for optimal async executor(s)
    • including warnings about compatible db backends
    • include notes about how to configure various concurrency settings
      • note what concurrency settings are ignored and which ones are applied
    • in the first instance, the config settings could be very specific to an AsyncExecutor

Are there any downsides to this change?

  • Creating an easy solution for backward compatible systems
    • Easily running blocking code in an AsyncExecutor
      • adding warnings when async is an option?
    • Refactoring base classes to enable async options?
      • ensuring async is not the default behavior?
  • A potential breaking change is the deprecation/removal of `airflow.executors.base_executor.BaseExecutor.execute_async` because it should not be assumed that all executors can implement async tasks.  The addition of an `AsyncExecutor` may imply changes that introduce an additional base class that is not entirely compatible with the API already defined in the `BaseExecutor` and it should not be assumed that the existing base class should capture all of the possible base classes for executors.  If a new `AsyncExecutor` must inherit from the existing base class (or a modified base class), it could be that it simply overrides a common `execute` method with an async version of it (unless the application of `async def execute` significantly changes the inheritance properties of the method; note that the existing base class defines `execute_async` without any `async def` declaration). 
    • e.g. the `CeleryExecutor` has to throw an exception for `execute_async`

Which users are affected by the change?

  • Mostly backend execution code and configuration
  • Need to consider impacts on UI status indicators
    • may need async compatible db drivers

How are users affected by the change? (e.g. DB upgrade required?)

  • Added configuration options
  • Added documentation for an AsyncExecutor
  • Compatibility with synchronous code
    • A pathway to enable async systems

Other considerations?

  • Debugging
  • CI test suites with/without async

What defines this AIP as "done"?

  • When 1000's of AWS batch jobs can be launched and monitored using an AsyncExecutor on a single CPU core
    • the point is that an async executor should support high concurrency for long-running tasks with blocking operations
    • multiple cores might help with db-connection pooling for all the concurrent async-task-coroutines
  • When both async and sync executors have a common API with properties that allow things to handle them appropriately
    • async executors might need to be used in the context of an existing event loop (or create a new one)
    • sync executors should not be able to submit tasks (jobs) on any async event loop


  • No labels