Retrieve list of tasks in a queue in Celery

How can I retrieve a list of tasks in a queue that are yet to be processed?


Solution 1:

EDIT: See other answers for getting a list of tasks in the queue.

You should look here: Celery Guide - Inspecting Workers

Basically this:

my_app = Celery(...)

# Inspect all nodes.
i = my_app.control.inspect()

# Show the items that have an ETA or are scheduled for later processing
i.scheduled()

# Show tasks that are currently active.
i.active()

# Show tasks that have been claimed by workers
i.reserved()

Depending on what you want

Solution 2:

if you are using rabbitMQ, use this in terminal:

sudo rabbitmqctl list_queues

it will print list of queues with number of pending tasks. for example:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
[email protected] 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
[email protected]    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

the number in right column is number of tasks in the queue. in above, celery queue has 166 pending task.

Solution 3:

If you are using Celery+Django simplest way to inspect tasks using commands directly from your terminal in your virtual environment or using a full path to celery:

Doc: http://docs.celeryproject.org/en/latest/userguide/workers.html?highlight=revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect active
$ celery inspect registered
$ celery inspect scheduled

Also if you are using Celery+RabbitMQ you can inspect the list of queues using the following command:

More info: https://linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues

Solution 4:

If you don't use prioritized tasks, this is actually pretty simple if you're using Redis. To get the task counts:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

But, prioritized tasks use a different key in redis, so the full picture is slightly more complicated. The full picture is that you need to query redis for every priority of task. In python (and from the Flower project), this looks like:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

If you want to get an actual task, you can use something like:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

From there you'll have to deserialize the returned list. In my case I was able to accomplish this with something like:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

Just be warned that deserialization can take a moment, and you'll need to adjust the commands above to work with various priorities.

Solution 5:

To retrieve tasks from backend, use this

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)