using class methods as celery tasks
I'm trying to use the methods of class as the django-celery tasks, marking it up using @task decorator. The same situation is discribed here, asked by Anand Jeyahar. It's something like this
class A:
@task
def foo(self, bar):
...
def main():
a = A()
...
# what i need
a.foo.delay(bar) # executes as celery task
a.foo(bar) # executes locally
The problem is even if i use class instance like this a.foo.delay(bar)
it says, that foo
needs at least two arguments, which meens that self
pointer misses.
More information:
- I can't convert class to module because of inheritance
- Methods are strongly depended on class members, so i can't make them static
- Marking class as the task with @task decorator makes the class a task itself, and it could be possible to execute the methods from
run()
method, using some argument as a key for method selection, but it's not exactly what i want. - Creating an instance of class and passing it as
self
argument to methods changes the way i execute the methods not as celery taks, but as usual methods (i.e. while testing) - I've tried to find out how i can register the task dinamically, from constructor for example, but celery shares the code between the workers, so that's why it seems to be impossible.
Thanks for your help!
Solution 1:
Celery has experimental support for using methods as tasks since version 3.0.
The documentation for this is in celery.contrib.methods
, and also mentions some caveats you should be aware of:
https://docs.celeryproject.org/en/3.1/reference/celery.contrib.methods.html
Be aware: support for contrib.methods
removed from Celery since 4.0
Solution 2:
Jeremy Satterfield has a clean and straight forward tutorial to write class based tasks if that's what you want to accomplish. You can check it here.
The magic is basically extending celery.Task
class including a run()
method, like something like this:
from celery import Task
class CustomTask(Task):
ignore_result = True
def __init__(self, arg):
self.arg = arg
def run(self):
do_something_with_arg(self.arg)
and then run the task like this:
your_arg = 3
custom_task = CustomTask()
custom_task.delay(your_arg)
I am not sure if ignore_result = True
part is necessary or not.
Solution 3:
When you have:
a = A()
you can do:
A.foo.delay(a, param0, .., paramN)
Cheers
Solution 4:
I ran into a similar situation and decided to wrap class methods within a simple function that would redirect its parameters to an instance of the class and it's execution of such methods:
class A:
def foo(self, bar):
# do this
a = A()
@app.task
def a_wrapper(bar):
return a.foo(bar)
# probably in a different size with an import in-place:
a_wrapper.delay(bar)