How to dynamically add / remove periodic tasks to Celery (celerybeat)
If I have a function defined as follows:
def add(x,y):
return x+y
Is there a way to dynamically add this function as a celery PeriodicTask and kick it off at runtime? I'd like to be able to do something like (pseudocode):
some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
I would also want to stop or remove that task dynamically with something like (pseudocode):
celery.beat.remove_task(some_unique_task_id)
or
celery.beat.stop(some_unique_task_id)
FYI I am not using djcelery, which lets you manage periodic tasks via the django admin.
This question was answered on google groups.
I AM NOT THE AUTHOR, all credit goes to Jean Mark
Here's a proper solution for this. Confirmed working, In my scenario, I sub-classed Periodic Task and created a model out of it since I can add other fields to the model as I need and also so I could add the "terminate" method. You have to set the periodic task's enabled property to False and save it before you delete it. The whole subclassing is not a must, the schedule_every method is the one that really does the work. When you're ready to terminate you task (if you didn't subclass it) you can just use PeriodicTask.objects.filter(name=...) to search for your task, disable it, then delete it.
Hope this helps!
from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = ['days', 'hours', 'minutes', 'seconds'] if period not in permissible_periods: raise Exception('Invalid period specified') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete()
This was finally made possible by a fix included in celery v4.1.0. Now, you just need to change the schedule entries in the database backend, and celery-beat will act according to the new schedule.
The docs vaguely describe how this works. The default scheduler for celery-beat, PersistentScheduler
, uses a shelve file as its schedule database. Any changes to the beat_schedule
dictionary in the PersistentScheduler
instance are synced with this database (by default, every 3 minutes), and vice-versa. The docs describe how to add new entries to the beat_schedule
using app.add_periodic_task
. To modify an existing entry, just add a new entry with the same name
. Delete an entry as you would from a dictionary: del app.conf.beat_schedule['name']
.
Suppose you want to monitor and modify your celery beat schedule using an external app. Then you have several options:
- You can
open
the shelve database file and read its contents like a dictionary. Write back to this file for modifications. - You can run another instance of the Celery app, and use that one to modify the shelve file as described above.
- You can use the custom scheduler class from django-celery-beat to store the schedule in a django-managed database, and access the entries there.
- You can use the scheduler from celerybeat-mongo to store the schedule in a MongoDB backend, and access the entries there.
No, I'm sorry, this is not possible with the regular celerybeat.
But it's easily extensible to do what you want, e.g. the django-celery scheduler is just a subclass reading and writing the schedule to the database (with some optimizations on top).
Also you can use the django-celery scheduler even for non-Django projects.
Something like this:
-
Install django + django-celery:
$ pip install -U django django-celery
-
Add the following settings to your celeryconfig:
DATABASES = { 'default': { 'NAME': 'celerybeat.db', 'ENGINE': 'django.db.backends.sqlite3', }, } INSTALLED_APPS = ('djcelery', )
-
Create the database tables:
$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
-
Start celerybeat with the database scheduler:
$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \ -S djcelery.schedulers.DatabaseScheduler
Also there's the djcelerymon
command which can be used for non-Django projects
to start celerycam and a Django Admin webserver in the same process, you can
use that to also edit your periodic tasks in a nice web interface:
$ djcelerymon
(Note for some reason djcelerymon can't be stopped using Ctrl+C, you have to use Ctrl+Z + kill %1)
There is a library called django-celery-beat which provides the models one needs. To make it dynamically load new periodic tasks one has to create its own Scheduler.
from django_celery_beat.schedulers import DatabaseScheduler
class AutoUpdateScheduler(DatabaseScheduler):
def tick(self, *args, **kwargs):
if self.schedule_changed():
print('resetting heap')
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = new_schedule.keys() - self.schedule.keys()
to_remove = self.schedule.keys() - new_schedule.keys()
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._schedule
You can check out this flask-djcelery which configures flask and djcelery and also provides browseable rest api