Background tasks in flask

Best practice

The best way to implement background tasks in flask is with Celery as explained in this SO post. A good starting point is the official Flask documentation and the Celery documentation.

Crazy way: Build your own decorator

As @MrLeeh pointed out in a comment, Miguel Grinberg presented a solution in his Pycon 2016 talk by implementing a decorator. I want to emphasize that I have the highest respect for his solution; he called it a "crazy solution" himself. The below code is a minor adaptation of his solution.

Warning!!!

Don't use this in production! The main reason is that this app has a memory leak by using the global tasks dictionary. Even if you fix the memory leak issue, maintaining this sort of code is hard. If you just want to play around or use this in a private project, read on.

Minimal example

Assume you have a long running function call in your /foo endpoint. I mock this with a 10 second sleep timer. If you call the enpoint three times, it will take 30 seconds to finish.

Miguel Grinbergs decorator solution is implemented in flask_async. It runs a new thread in a Flask context which is identical to the current Flask context. Each thread is issued a new task_id. The result is saved in a global dictionary tasks[task_id]['result'].

With the decorator in place you only need to decorate the endpoint with @flask_async and the endpoint is asynchronous - just like that!

import threading
import time
import uuid
from functools import wraps

from flask import Flask, current_app, request, abort
from werkzeug.exceptions import HTTPException, InternalServerError

app = Flask(__name__)
tasks = {}


def flask_async(f):
    """
    This decorator transforms a sync route to asynchronous by running it in a background thread.
    """
    @wraps(f)
    def wrapped(*args, **kwargs):
        def task(app, environ):
            # Create a request context similar to that of the original request
            with app.request_context(environ):
                try:
                    # Run the route function and record the response
                    tasks[task_id]['result'] = f(*args, **kwargs)
                except HTTPException as e:
                    tasks[task_id]['result'] = current_app.handle_http_exception(e)
                except Exception as e:
                    # The function raised an exception, so we set a 500 error
                    tasks[task_id]['result'] = InternalServerError()
                    if current_app.debug:
                        # We want to find out if something happened so reraise
                        raise

        # Assign an id to the asynchronous task
        task_id = uuid.uuid4().hex

        # Record the task, and then launch it
        tasks[task_id] = {'task': threading.Thread(
            target=task, args=(current_app._get_current_object(), request.environ))}
        tasks[task_id]['task'].start()

        # Return a 202 response, with an id that the client can use to obtain task status
        return {'TaskId': task_id}, 202

    return wrapped


@app.route('/foo')
@flask_async
def foo():
    time.sleep(10)
    return {'Result': True}


@app.route('/foo/<task_id>', methods=['GET'])
def foo_results(task_id):
    """
        Return results of asynchronous task.
        If this request returns a 202 status code, it means that task hasn't finished yet.
        """
    task = tasks.get(task_id)
    if task is None:
        abort(404)
    if 'result' not in task:
        return {'TaskID': task_id}, 202
    return task['result']


if __name__ == '__main__':
    app.run(debug=True)

However, you need a little trick to get your results. The endpoint /foo will only return the HTTP code 202 and the task id, but not the result. You need another endpoint /foo/<task_id> to get the result. Here is an example for localhost:

import time
import requests

task_ids = [requests.get('http://127.0.0.1:5000/foo').json().get('TaskId')
            for _ in range(2)]
time.sleep(11)
results = [requests.get(f'http://127.0.0.1:5000/foo/{task_id}').json()
           for task_id in task_ids]
# [{'Result': True}, {'Result': True}]