How do i check if all my tasks in an airflow dag were successful?
Solution 1:
We had a similar use-case where we want to identify if all the tasks were sucessful
. In Airflow, if a task fails and if we have a trigger_rule
one_failed, the DAG can run ends up being marked a successful as there was a recovery from failure.
Solution we implemented with single email to track all the task_instances:
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
def check_all_success(**context):
dr: DagRun = context["dag_run"]
ti: TaskInstance = context["ti"]
# here we remove the task currently executing this logic
ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])
# Remove success state
ti_summary.remove('success')
# If TI summary had any other state except success, there was an issue in the run
if ti:
# Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
pass
else:
# Send email: All tasks in DAG: {dr.dag_id} completed successfully
pass
check_all_tasks = PythonOperator(
task_id='check_all_tasks',
python_callable=check_all_success,
provide_context=True
)