How to skip a task in airflow without skipping its downstream tasks?

Solution 1:

You can refer to the Airflow documentation on trigger_rule.

trigger_rule allows you to configure the task's execution dependency. Generally, a task is executed when all upstream tasks succeed. You can change that to other trigger rules provided in Airflow. The all_failed trigger rule only executes a task when all upstream tasks fail, which would accomplish what you outlined.

from datetime import datetime

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.trigger_rule import TriggerRule

with DAG(
        dag_id="my_dag",
        start_date=datetime(2021, 4, 5),
        schedule_interval='@once',
) as dag:
    p = PythonOperator(
        task_id='fail_task',
        python_callable=lambda x: 1,
    ) 
    
    t = PythonOperator(
        task_id='run_task',
        python_callable=lambda: 1,
        trigger_rule=TriggerRule.ALL_FAILED
    )
    
    p >> t