Status
Motivation
Airflow does not currently have an explicit way to declare messages passed between tasks in a DAG. XCom are available but are hidden in execution functions inside the operator. This can create confusion when working on ETL pipelines that need to send messages between tasks. This includes PythonOperator, which could be used for simple message processing but is not as it's difficult to accept messages from previous tasks as arguments and it needs to be pulled explicitly creating a lot of bootstrapping code.
This AIP focuses on proposing a Functional API that would allow to explicitly declare message passing while implicitly declaring task dependencies. At the same time, it introduces a new way to easily declare PythonOperators using a simple decorator.
Considerations
What change do you propose to make?
List of changes proposed by component.
XComArg
- Explicit way to pass around results between operators.
- This object is a reference to an XCom value that has not been created and will need to be resolved in the future.
- It can generate new XComArgs that point to the same operator but has a different key easily.
Add __call__ in BaseOperator
Add a functional interface to replace class attributes on execution time. This function should also be in charge of :
- Checking if any of the attributes are XComArgs and resolve them
- Replacing class attributes with the resolved args when executing on a worker.
- Adding dependencies to operators which current operator needs an XCom from.
- Return an XComArg pointing to ‘return_value’ key by default
PythonFunctionalOperator
- Make it easier to set op_arg and op_kwargs from __call__ on a PythonOperator, effectively enabling function like operations based on XCom values.
- Should allow copies with alias for reuse in the same DAG.
Python function decorator
- Enable easy transformation of python functions into PythonFunctionalOperators by making a decorator that takes a function and converts it into a PythonFunctionalOperator.
- Should also allow to pass arguments to the PythonOperator from the decorator itself.
DAG example
See: https://github.com/casassg/corrent/tree/master/dags
import json |
What problem does it solve?
- Explicit message passing in Airflow DAGs
Why is it needed?
- Extends Airflow DAG API to allow explicit message passing between tasks
- Improves developer experience for simple usages of PythonOperator
- Incorporates a new abstraction for message passing preparing Airflow to better support on data pipelines use cases
Are there any downsides to this change?
- Agreeing on the right abstraction is key, at the same time keeping in mind future features that could be added to Airflow to build on this API. Difficulties will mostly be in agreeing on a single implementation for this proposal.
- Implementation-wise, it may require some extension of the webserver to load the resolved XCom specified as a dependency for observability.
- Potentially, this could lead to more users/customers using XCom to pass around large pieces of data
- XCom uses pickling for serialization/deserialization, which may be unstable for certain data passed around.
Which users are affected by the change?
Most users will be able to explicitly declare message passing on their DAGs using a Python functional approach. No existing DAGs will need to change.
How are users affected by the change? (e.g. DB upgrade required?)
It's an incremental and optional update. Users will not be affected by the change.
Other considerations?
This proposal comes from inspecting APIs in new DSL frameworks like Dagster, Prefect, Metaflow and Databand.
What defines this AIP as "done"?
- AIP has been voted to move forward with an agreement on the right API abstraction.
- __call__ interface is added to BaseOperator and XComArg class is added
- PythonCallableOperator and the python functional decorator are merged
- - Stretch - Webserver loads resolved XComArgs into the UI when inspecting a Task instance (both inlets and outlets).