Unit testing Airflow DAGs

Published on Jan 6, 2025

·

2 min read

Blogpost Main Image

Prelude

You want to test your airflow DAGs and tasks. You are not sure how to do that.

Astronomer have a great article on this; the post here is meant as a cheat sheet with some inspiration from there.

Prerequisites

Code

The most common tests are “meta”, in the sense of whether the DAGs are configured correctly.

import pytest

from airflow.models import DagBag, BaseOperator

@pytest.fixture(scope="session")
def dag_bag():
	return DagBag(include_examples=False)

def test_import_errors(dag_bag):
	assert not dag_bag.import_errors, f"DAGs contain import errors!"

def test_dags_contain_tasks(dag_bag):
	for dag_name, dag in dag_bag.dags.items():
		assert dag.tasks, f"DAG {dag_name=} does not have tasks!"

def test_context_manager_dag_contains_tasks(dag_bag):
	actual_task_ids = [task.task_id for task in dag_bag.get_dag("context_manager_dag").tasks]
	expected_task_ids = ["bash_task"]

	assert actual_task_ids == expected_task_ids

Testing the DAGs and their tasks is also possible, albeit more tedious. The downside is that this requires an airflow db since it is replicating actual DagRun‘s.

from datetime import datetime, timezone

from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType

# caplog is pytest-only
def test_context_manager_dag(caplog, dag_bag):
	dag = dag_bag.get_dag("context_manager_dag")
	dag.test()
	assert "echo 42" in caplog.text

def test_context_manager_dag_bash_task(dag_bag):
	dag = dag_bag.get_dag("context_manager_dag")
	dagrun = dag.create_dagrun(
        state=DagRunState.RUNNING,
        execution_date=datetime.now(timezone.utc),
        run_type=DagRunType.MANUAL,
    )
	ti = dagrun.get_task_instance(task_id="bash_task")
	ti.task = dag.get_task(task_id="bash_task")
	ti.run(ignore_ti_state=True)

	assert ti.state == TaskInstanceState.SUCCESS

Addendum

Isn’t testing the log output bad practice and potentially useless?

For this example, definitely; it is more to illustrate the existence of dag.test. However one can test other functionality, for example whether specific tasks are executed as expected, custom logging, metrics gathering, etc.

Why test a BashOperator?

Well indeed, one probably shouldn’t. A more realistic example would be testing the code applied by a PythonOperator, but then one would test the actual code and not the operator. However the flow exemplified here would apply to testing a custom operator.

Notice something wrong? Have an additional tip?

Contribute to the discussion here