Status
Motivation
Current state
Currently, system tests are not integrated into the CI process and rarely being executed which makes them outdated and faulty. The purpose of testing is to preserve a high quality of the product, thus tests should be run regularly and should be easy to maintain. Old design uses additional dependencies like credential files and many unnecessary environment variables which introduce hard to maintain complexity.
The motivation of this AIP
The new design of system tests doesn't change the tests themselves but redefines how they are run. Also, it includes CI integration which allows us to run system tests automatically.
Currently, there are many issues related to how Airflow Operators (not) work and having automated testing in place, we can decrease the amount of possible bugs reported.
Assuming that the new design is approved and implemented we take benefit from:
- Assuring the high quality of providers
- Running operators regularly with user-oriented use cases
- Possibly lower rate of creation of new bugs
- Simpler way of running system tests
- Faster results from system tests execution
- Easier maintenance of system tests
- Decreased entry threshold for writing new system tests
What change do you propose to make?
- Each test is self-contained, which means that all information required to run the test is stored within the test file. If some additional big chunk of data is required, it needs to be stored separately as a resource file imported into the test file.
- The test file is basically a DAG file with tasks running the operators - some of them are operators under test, and some are there for setup and teardown necessary resources. It may also contain tasks that verify correctness of executed operators. These test files will be deployed to the DAGs folder of Airflow and executed as regular DAGs.
For pytest support, at the end of each file there will be several lines that will enable running DAG with this tool. The presence of these lines will be checked automatically using a pre-commit hook and, if absent, added automatically. To see the example, go to Complete test example section. - New design doesn’t require entering a breeze environment to run the tests. They can be easily run with pytest + DebugExecutor or even triggered using IDE.
All system tests are going to be stored inside the
tests/system/providers/<provider_name>
directory to keep them separate from other types of tests. As all the tests are actually DAGs they can be executed in parallel by Airflow.
Another possibility is to store these tests under the providers directories (tests/providers/<provider_name>/system/
).
Nevertheless, there is no need to store them under the “example_dags” directory because they will be displayed as examples in the documentation anyway.
The paths for the documentation generated from code of the tests (example dags) need to be updated accordingly. Old tests need to be removed when transferred to the new design because maintaining both would require a significant amount of time and provide very low value.Note
In the old design, system tests were stored insidetests/providers/<provider_name>/<service_name>/operators
and they pointed to the example DAGs located inairflow/providers/<provider_name>/<service_name>/example_dags
.- Amount of environment variables needed to run the tests will be kept at minimum. All data that needs to be unique across the Airflow instance running the tests now should use
SYSTEM_TESTS_
ENV_ID
andDAG_ID
as unique identifiers. In the previous implementation, thevariables.env
file was used to gather all unique values. More about it (with example) in the end of section “What problem does it solve?”. - With the new approach, no credential files are needed for most of the tests. All needed permissions to external services for execution of DAGs (tests) should be provided to the Airflow instance in advance. This is the responsibility of the Airflow setup process but not responsibility of tests itself.
Each provider should create an instruction explaining how to prepare the environment to run related system tests so that users can do it on their own. The provider should also prepare an environment for running those tests in the CI integration to enable running them regularly. Providing the environment also for local execution is recommended, so that users of Airflow can run the tests when updating system tests of a specific provider. - CI integration can be built using GitHub CI or provider-related solution (like Cloud Build for Google tests).
Using GitHub CI can potentially block precious resources for running other checks for quite a long time, while using provider-related systems requires some additional set up but would be executed as unrelated action.
No matter which solution will be chosen, the tests can be set to trigger only if specific tests were edited and only those tests will be executed.
More about the pros and cons of each solution in the Design details →How to run tests.
What problem does it solve?
Using DAG files as test files enables us to keep all code within 1 file. We don’t need to bother about special dependencies listed above - we upload a DAG file with its assets (like data files) directly to Airflow and it runs. Since the Airflow executor is used to run the tests, they will be run in parallel (depending on the Airflow configuration). Having 1 test file makes it easier to maintain system tests.
Setting up the breeze environment is not that easy as it is stated and because running system tests in the current design requires running breeze, it can be hard and painful. Reducing the amount of configuration and a possibility to run tests directly on Airflow makes it easier for developers to write and run the tests and to maintain CI integration. Also, without the requirement of pytest dependency, we only need to rely on the Airflow environment, which should positively affect the stability of tests.
Current tests perform a lot of reading from environment variables that need to be set before the tests are run. That means that a team running tests for a specific provider needs to maintain a file containing all environment variables that are considered unique. Now, all data and names of the variables that require uniqueness can incorporate DAG_ID
and optionally ENV_ID
into their value to avoid risk of collision. The ENV_ID
needs to be generated before the DAGs are run and the length of its value needs to be long enough to minimize the possibility of collision (e.g. 6-characters-long string containing lowercase letters and numbers).
Example of creating new Google Cloud Storage Bucket with this approach:
import os ENV_ID = os.environ["SYSTEM_TESTS_ENV_ID"] DAG_ID = "example_dag" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}
Are there any downsides to this change?
Dropping pytest usage for system tests leaves us without a tool to aggregate results from the tests. This is going to be handled by custom scripts that can collect the information about DAGs statuses, parse them and print the report.
Some current tests may run functions that are not Airflow Operators. These tests need to be rewritten using operators and converted to DAGs or moved to another directory making them to be considered as another type of test (integration, unit etc.).
Living documentation that is generated using sphinx
package and special comment markings in the code should be updated so that they are read from a new path.
All current tests need to be moved or refactored using the new design. This shouldn’t be a big change because we have only 123 (as of 25th January 2022) system tests in the repository, while a big bunch of them relate to Google Operators (around 100). Every test that is outdated and not needed anymore will be removed to avoid a situation when we need to maintain two designs of system tests at the same time.
Why is it needed?
System tests are not currently maintained and run. Debugging and fixing mostly not working tests is a very time consuming process. The lack of CI integration causes them to age and deprecate. With the new design of system tests, they are intended to be run regularly.
The documentation for Airflow Operators is generated from source code of system tests, so not working code produces not working examples in the documentation, spreading errors and bad practises into the community. Running system tests on a daily basis, we ensure that the examples are up-to-date.
Maintaining system tests requires knowledge about breeze, pytest and maintenance of special files like variables.env
or credential files. Without those, we will simplify the architecture and improve management over tests.
Which users are affected by the change?
The change mostly affects developers of providers that currently (as of 25th January 2022) have Airflow system tests and potentially future developers that will create new system tests.
The list of providers affected by the change with related products and number of tests:
- Google (Cloud) - 88 tests
- Amazon (AWS) - 15 tests
- Apache (Beam) - 6 tests
- Microsoft (Azure) - 4 tests
- CNCF (Kubernetes) - 1 test
- Snowflake - 1 test
- Postgres - 1 test
- Asana - 1 test
- HTTP - 1 test
- Airflow (Built-in Operators) - 5 tests
Most of the tests (around 70%) operate over Google Cloud products. Total number of tests is 123.
It will also affect Airflow users who are using Airflow providers and it will improve their experience because we will have automation of running system tests which will assure high quality of providers.
What defines this AIP as "done"?
Possibility to run system tests with the new design regularly as part of the CI process on GitHub with a report that is easy to understand and helps maintainers to correctly find a place where a problem (if any) occurs. Old system tests should be moved & refactored or deleted (if not applicable or deprecated).
Design details
Process of migration in details
Create new file with the name of the service you are going to migrate in
tests/system/providers/<provider_name/<service_name>/test_file.py
Remember to prefix the filename withtest_*
.Setup and teardown methods
Check if the system test you are going to migrate doesn’t have any additional configuration that is required to run it. Good place to start is where the pytest test is triggered (
tests/providers/<provider_name>/…/test_*_system.py
) and look for any actions executed insidesetUp
ortearDown
methods.Try to rewrite those actions using another available Airflow Operators as tasks or just use
PythonOperator
orBashOperator
.If you’re creating any resource during the tests, remember to remove them at the end (by creating a teardown task) if they’re not needed anymore.
If a teardown task(s) has been defined, remember to add
trigger_rule="all_done"
parameter to the operator call. This will make sure that this task will always run even if the upstream fails.Define global variables:
Define DAG name at the top of the file as DAG_ID global variable.
Add
ENV_ID
variable at the top of the file that is read fromSYSTEM_TESTS_ENV_ID
environment variable:os.environ["SYSTEM_TESTS_ENV_ID"]
Define any other commonly used variables (paths to files, data etc.) in the tasks at the top of the file. Include
DAG_ID
andENV_ID
variables in the value of these variables to decrease the possibility of having a conflict when running multiple tests in parallel. Example:DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
Change Airflow Executor to
DebugExecutor
by placing this line at the top of the file:os.environ['AIRFLOW__CORE__EXECUTOR'] = 'DebugExecutor'
Make sure to include these parameters into DAG call:
schedule_interval="@once"
- tells the scheduler to schedule the task only once,start_date=datetime(2021, 1, 1)
- makes sure that the DAG should be already executed,catchup=False
- prevents from executing the DAG many times to fill the gap betweenstart_date
and today,tags=["example", "something"]
- adds tags to quickly search the DAG.Put a task order after the tasks declaration in the DAG. Preferably define them line-by-line and add comments to explicitly show which task is setup/teardown and which is the test body (operators that are actually tested). Example:
( # TEST SETUP create_dataset >> create_table # TEST BODY >> copy_selected_data # TEST TEARDOWN >> delete_dataset )
Try to keep tasks in the DAG body defined in an order of execution.
At the bottom of the file add methods that will enable the test to be run with pytest:
def test_run(): from airflow.utils.state import State dag.clear(dag_run_state=State.NONE) dag.run() if __name__ == "__main__": test_run()
Remember, that these tests are also treated like examples for the community so keep them clean, concise and easy to understand.
Update the comment tags that mark the documentation script where to start and end reading the operator code that will be generated as an example in the official documentation. Look for something like this:
# [START howto_operator_bigquery_create_table]
or this:# [END howto_operator_bigquery_create_table]
And then update the path to the test file inside the RST file after.. exampleinclude::
that is related to the corresponding example.If the test needs any additional resources, put them into resources directory (create if it doesn’t exist) close to the test files.
Once you verify that the test in the new design works, remove the old test and example DAG. Possible locations to check:
tests/providers/<provider_name>/<service_name>/test_<smth>_system.py
airflow/providers/<provider_name>/<service_name>/example_dags/example_<service>.py
Congratulations! Your test is ready to be executed countless times!
Complete test example
""" Example Airflow DAG for Google BigQuery service local file upload and external table creation. """ import os from datetime import datetime from airflow import models from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCreateEmptyDatasetOperator, BigQueryCreateExternalTableOperator, ) from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator os.environ['AIRFLOW__CORE__EXECUTOR'] = 'DebugExecutor' ENV_ID = os.environ["SYSTEM_TESTS_ENV_ID"] DAG_ID = "example_bigquery_operations" DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" DATA_SAMPLE_GCS_OBJECT_NAME = "bigquery/us-states/us-states.csv" with models.DAG( DAG_ID, schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, tags=["example", "bigquery"], ) as dag: create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME) create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) upload_file = LocalFilesystemToGCSOperator( task_id="upload_file_to_bucket", src="../resources/us-states.csv", dst=DATA_SAMPLE_GCS_OBJECT_NAME, bucket=DATA_SAMPLE_GCS_BUCKET_NAME, ) # [START howto_operator_bigquery_create_external_table] create_external_table = BigQueryCreateExternalTableOperator( task_id="create_external_table", destination_project_dataset_table=f"{DATASET_NAME}.external_table", bucket=DATA_SAMPLE_GCS_BUCKET_NAME, source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME], schema_fields=[ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ], ) # [END howto_operator_bigquery_create_external_table] delete_bucket = GCSDeleteBucketOperator( task_id="delete_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, trigger_rule="all_done" ) ( # TEST SETUP [create_bucket, create_dataset] # TEST BODY >> upload_file >> create_external_table # TEST TEARDOWN >> delete_bucket ) def test_run(): from airflow.utils.state import State dag.clear(dag_run_state=State.NONE) dag.run() if __name__ == "__main__": test_run()
Tasks as setup and teardown
In pytest it’s possible to have setUp
and tearDown
methods that can prepare the environment for the test and clean after it’s executed. By dropping these pytest wrappers for system tests and having tests as self-contained DAG files, we need to move these operations inside the DAG files. This means that setup and teardown is going to be done by Operators.
For example, when running a BigQuery service, the creation of GCS Bucket is required often and it can be done by using dedicated operators. Also, the deletion of the bucket can be achieved by calling a specific operator. To give an information to the user about what is the actual test body and what are the tasks operating around the test, comments can be used in a fragment where tasks dependencies are defined, e.g.:
( # TEST SETUP [create_bucket, create_dataset] # TEST BODY >> upload_file >> create_external_table # TEST TEARDOWN >> delete_bucket )
Additionally, teardown tasks are often considered to clean after the test, no matter if they passed or failed (if something was created before the test, teardown should remove it). For that purpose we can use trigger_rule
attribute that is available for each operator. By choosing ”all_done”
as a value for trigger_rule
we make sure that this (teardown) operator will be run no matter the results from upstream tasks (even if skipped) but always preserving the tasks execution order. Example:
delete_dataset = BigQueryDeleteDatasetOperator( task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True, trigger_rule="all_done" )
Creation of unique values for variables
As mentioned above in What problem does it solve?, sometimes there is a need to create a variable with a unique value to avoid collision in the environment that runs tests. By using the property that DAG_ID
needs to be unique across all DAGs, we can benefit from it by using its value to actually create data that will not interfere with the rest.
Example:
ENV_ID = os.environ["SYSTEM_TESTS_ENV_ID"] DAG_ID = "example_dag" BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID} with models.DAG( DAG_ID, schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"], ) as dag: create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
Test structure
The tests should be structured in the way that they are easy to run as “standalone” tests manually but they should also nicely be integrated into pytest test execution environment. This can be achieved by leveraging the DebugExecutor
and utilising modern pytest test discovery mechanism (pytest will automatically discover all top-level functions in the module starting with test_ as “test cases”. This allows to have very simple and seamless integration with pytest (and open for all the useful features it has), but without introducing boilerplate code and with allowing to run the tests manually without using pytest:
The tests can be run by either pytest <file> or pytest <module> to run multiple tests but also as python <file> for manual run.
import os os.environ['AIRFLOW__CORE__EXECUTOR'] = 'DebugExecutor' with DAG as dag: ... def test_run(): from airflow.utils.state import State dag.clear(dag_run_state=State.NONE) dag.run() if __name__ == "__main__": test_run()
How to run tests
Running system tests can be done in multiple ways depending on the environment and the user’s choice. When developing new tests or adding new features for Airflow, a user may want to run system tests to see if nothing’s broken. For this, one can refer to a section below describing how to run system tests locally. The tests are going to be run also in the CI process of releasing a new Airflow version or provider’s packages. The section “In CI process” explains how tests will be integrated in the CI/CD.
Locally
System tests can be executed using pytest. You can either navigate to test file and run tests using your IDE widget for pytest (the tests should be discovered as pytest tests by your IDE) or run following command:
pytest tests/system/path/to/test_file.py
You can also use Breeze to run the tests.
It is also possible to not use pytest and execute DAG directly with Python:
python tests/system/path/to/test_file.py
It will also run the DAG using DebugExecutor
but there will be no additional pytest output.
In CI process
Running System tests usually take much more time and resources than running unit tests. Some of the tests might run for literally hours, and blocking GitHub Actions workers for the tests might not be the best idea. GitHub Runners for Apache Airflow are a shared resource with other Apache projects (Apache has maximum 150 running runners for all their projects) and blocking the runners for a long time, already caused problems in the past.
However in order to fulfill their role, the system tests should be run periodically and when Pull Requests are pushed, with changes related to the tests in question. This might be done by using the approach Community uses currently (selective CI checks) but with more granularity if we find that the system tests take too much time.
We propose to use the “related” system to provide an execution engine - for example if we run GCP system tests we can use Cloud Build to execute the tests. (Proof of Concept done by Tobiasz Kędzierski in https://github.com/PolideaInternal/airflow-system-tests). Similarly to run AWS tests we could use AWS Code Pipeline and for Azure - Azure Pipelines. We need anyhow credits for the respective cloud providers so those credits could be utilised to run both - services we test and CI/CD for those. All such services have integration with GitHub repositories. Each of those integration needs to be done following these principles:
Public access to Build dashboard and build logs. This might be not easy - for example Cloud Build dashboard cannot be “publicly viewable” however logs can be exported and made publicly available via links and integration from GitHub Actions.
Status of the build should be reported back to GitHub Actions for Pull Requests. This can be achieved by already existing integrations, for example Cloud Build integration. Further analysis will need to be done in order to make detailed integration.
If needed, the external service can have “check:write” permission and provide appropriate status checks for PR via the GitHub API https://docs.github.com/en/rest/reference/checks#create-a-check-run. That will require to authorise the system via specific tokens to have those permissions and might require cooperation with the Apache Software Foundation Infrastructure team.
Also having the “big” cloud provider credits for those checks will enable using those credits to run checks for other services (those are rather inexpensive usually).