Using XCom in Airflow
Published on Jan 20, 2025
2 min read
Prelude
Despite the recommendation not to, you still need to pass some details between Airflow tasks. Maybe it’s as simple as a dataset path on S3.
Prerequisites
- a machine
- Python 3
- apache-airflow installed
Code
In Airflow, this is known as XCom which, as delved a bit further in the main article, must be by default serializable to the airflow db.
An example with the PythonOperator
:
with DAG(dag_id="xcom_dag_python"):
task1 = PythonOperator(
task_id="task1",
python_callable=lambda *_: "path"
)
task2 = PythonOperator(
task_id="task2",
python_callable=lambda **context: print(context["ti"].xcom_pull(task_ids='task1')),
provide_context=True
)
task1 >> task2
Note how by default the value returned by the python_callable
is an xcom_push
,
and the ti
is a context
key-value pair enabled by provide_context
.
An example with the BashOperator
and a jinja template to access the same context.
Note the template is also possible in the PythonOperator
vie templates_dict
.
with DAG(dag_id="xcom_dag_jinja"):
task1 = BashOperator(
task_id="task1",
bash_command="echo 42",
do_xcom_push=True
)
task2 = BashOperator(
task_id="task2",
bash_command="echo 'echoed {{ ti.xcom_pull(task_ids='task1') }}'",
do_xcom_push=False
)
task1 >> task2
Addendum
Why shouldn’t we use xcom again?
Xcom data is stored in a database, so must obey its constraints, such as being serializable and not too large. One must also realize that each xcom call is yet another db operation which can affect the rest of the application. Xcom, since they behave like airflow variables, can also be deleted via the UI (by mistake). So no, unfortunately it doesn’t make sense for tasks to pass dataframes from one to another.