Apache Airflow: Delay a task for some period of time

I am trying to execute a task after 5 minutes from the parent task inside a DAG.

DAG : Task 1 ----> Wait for 5 minutes ----> Task 2

How can I achieve this in Apache Airflow? Thanks in advance.


The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1 and Task 2


This can be achieved using PythonOperator

import time
from airflow.operators.python import PythonOperator

delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
                                                   dag=my_dag,
                                                   python_callable=lambda: time.sleep(300))

task_1 >> delay_python_task >> task_2

Or using BashOperator as well

from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
                                             dag=my_dag,
                                             bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2

Note: The given code-snippets are NOT tested


References

  • example_python_operator.py
  • example_bash_operator.py

UPDATE-1

Here are some other ways of introducing delay

  • UPDATE: do NOT use this as pointed out by @Vit.ai. Original point: on_success_callback / on_failure_callback: Depending of whether Task 2 is supposed to run upon success or failure of Task 1, you can pass lambda: time.sleep(300) in either of these params of Task 1.
  • pre_execute() / post_execute(): Invoking time.sleep(300) in Task 1's post_execute() or Task 2's pre_execute() would also have the same effect. Of course this would involve modifying code for your tasks (1 or 2) so better avoid it

Personally I would prefer the extra task approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1 or Task 2


@y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:

< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)

task1 >> task2