How to restart a failed task on Airflow
I am using a LocalExecutor and my dag has 3 tasks where task(C) is dependant on task(A). Task(B) and task(A) can run in parallel something like below
A-->C
B
So task(A) has failed and but task(B) ran fine. Task(C) is yet to run as task(A) has failed.
My question is how do i re run Task(A) alone so Task(C) runs once Task(A) completes and Airflow UI marks them as success.
In the UI:
- Go to the dag, and dag run of the run you want to change
- Click on GraphView
- Click on task A
- Click "Clear"
This will let task A run again, and if it succeeds, task C should run. This works because when you clear a task's status, the scheduler will treat it as if it hadn't run before for this dag run.
Here's an alternate solution where you can have it clear and retry certain tasks automatically. If you only want to clear a certain task, you would not use the -d (downstream) flag:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def clear_upstream_task(context):
execution_date = context.get("execution_date")
clear_tasks = BashOperator(
task_id='clear_tasks',
bash_command=f'airflow tasks clear -s {execution_date} -t t1 -d -y clear_upstream_task'
)
return clear_tasks.execute(context=context)
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}
with DAG('clear_upstream_task',
start_date=datetime(2021, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=5),
default_args=default_args,
catchup=False
) as dag:
t0 = DummyOperator(
task_id='t0'
)
t1 = DummyOperator(
task_id='t1'
)
t2 = DummyOperator(
task_id='t2'
)
t3 = BashOperator(
task_id='t3',
bash_command='exit 123',
on_failure_callback=clear_upstream_task
)
t0 >> t1 >> t2 >> t3