How do i check if all my tasks in an airflow dag were successful?

Solution 1:

We had a similar use-case where we want to identify if all the tasks were sucessful. In Airflow, if a task fails and if we have a trigger_rule one_failed, the DAG can run ends up being marked a successful as there was a recovery from failure.

Solution we implemented with single email to track all the task_instances:

from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance

def check_all_success(**context):
    dr: DagRun = context["dag_run"]
    ti: TaskInstance = context["ti"]
    
    # here we remove the task currently executing this logic
    ti_summary = set([task.state for task in dr.get_task_instances() if task.task_id != ti.task_id])

    # Remove success state
    ti_summary.remove('success')

    # If TI summary had any other state except success, there was an issue in the run
    if ti:
        # Send email: All tasks in DAG: {dr.dag_id} did not complete successfully
        pass
    else:
        # Send email: All tasks in DAG: {dr.dag_id} completed successfully
        pass

check_all_tasks = PythonOperator(
    task_id='check_all_tasks',
    python_callable=check_all_success,
    provide_context=True
)