Apache Airflow: Delay a task for some period of time
I am trying to execute a task after 5 minutes from the parent task inside a DAG.
DAG : Task 1 ----> Wait for 5 minutes ----> Task 2
How can I achieve this in Apache Airflow? Thanks in advance.
The said behaviour can be achieved by introducing a task that forces a delay of specified duration between your Task 1
and Task 2
This can be achieved using PythonOperator
import time
from airflow.operators.python import PythonOperator
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=my_dag,
python_callable=lambda: time.sleep(300))
task_1 >> delay_python_task >> task_2
Or using BashOperator
as well
from airflow.operators.bash import BashOperator
delay_bash_task: BashOperator = BashOperator(task_id="delay_bash_task",
dag=my_dag,
bash_command="sleep 5m")
task_1 >> delay_bash_task >> task_2
Note: The given code-snippets are NOT tested
References
example_python_operator.py
example_bash_operator.py
UPDATE-1
Here are some other ways of introducing delay
-
UPDATE: do NOT use this as pointed out by @Vit.ai. Original point:
on_success_callback
/on_failure_callback
: Depending of whetherTask 2
is supposed to run upon success or failure ofTask 1
, you can passlambda: time.sleep(300)
in either of these params ofTask 1
. -
pre_execute()
/post_execute()
: Invokingtime.sleep(300)
inTask 1
'spost_execute()
orTask 2
'spre_execute()
would also have the same effect. Of course this would involve modifying code for yourtasks
(1 or 2) so better avoid it
Personally I would prefer the extra task
approach because it makes things more explicit and doesn't falsely exaggerate the runtime of your Task 1
or Task 2
@y2k-shubham gave the best answer to date, however, I want to warn not to use the callback solution. as it first marks the task as success and then executes the callback. which means task2 will not see any delay. if you don't want to use a separate task, you can use something like this:
< ... >
task1 = DummyOperator(task_id='task1', dag=dag)
task1.post_execute = lambda **x: time.sleep(300)
task2 = DummyOperator(task_id'task2', dag=dag)
task1 >> task2