using class methods as celery tasks

I'm trying to use the methods of class as the django-celery tasks, marking it up using @task decorator. The same situation is discribed here, asked by Anand Jeyahar. It's something like this

class A:
    @task
    def foo(self, bar):
        ...

def main():
    a = A()
    ...
    # what i need
    a.foo.delay(bar) # executes as celery task 
    a.foo(bar) # executes locally

The problem is even if i use class instance like this a.foo.delay(bar) it says, that foo needs at least two arguments, which meens that self pointer misses.

More information:

  • I can't convert class to module because of inheritance
  • Methods are strongly depended on class members, so i can't make them static
  • Marking class as the task with @task decorator makes the class a task itself, and it could be possible to execute the methods from run() method, using some argument as a key for method selection, but it's not exactly what i want.
  • Creating an instance of class and passing it as self argument to methods changes the way i execute the methods not as celery taks, but as usual methods (i.e. while testing)
  • I've tried to find out how i can register the task dinamically, from constructor for example, but celery shares the code between the workers, so that's why it seems to be impossible.

Thanks for your help!


Solution 1:

Celery has experimental support for using methods as tasks since version 3.0.

The documentation for this is in celery.contrib.methods, and also mentions some caveats you should be aware of:

https://docs.celeryproject.org/en/3.1/reference/celery.contrib.methods.html

Be aware: support for contrib.methods removed from Celery since 4.0

Solution 2:

Jeremy Satterfield has a clean and straight forward tutorial to write class based tasks if that's what you want to accomplish. You can check it here.

The magic is basically extending celery.Task class including a run() method, like something like this:

from celery import Task

class CustomTask(Task):
    ignore_result = True

    def __init__(self, arg):
        self.arg = arg

    def run(self):
        do_something_with_arg(self.arg)

and then run the task like this:

your_arg = 3

custom_task = CustomTask()
custom_task.delay(your_arg)

I am not sure if ignore_result = True part is necessary or not.

Solution 3:

When you have:

    a = A()

you can do:

    A.foo.delay(a, param0, .., paramN)

Cheers

Solution 4:

I ran into a similar situation and decided to wrap class methods within a simple function that would redirect its parameters to an instance of the class and it's execution of such methods:

class A:
    def foo(self, bar):
       # do this

a = A()

@app.task
def a_wrapper(bar):
    return a.foo(bar)

# probably in a different size with an import in-place:

a_wrapper.delay(bar)