How to add a timeout to a function in Python
Many attempts have been made in the past to add timeout functionality in Python such that when a specified time limit expired, waiting code could move on. Unfortunately, previous recipes either allowed the running function to continue running and consuming resources or else killed the function using a platform-specific method of thread termination. The purpose of this wiki is to develop a cross-platform answer to this problem that many programmers have had to tackle for various programming projects.
#! /usr/bin/env python
"""Provide way to add timeout specifications to arbitrary functions.
There are many ways to add a timeout to a function, but no solution
is both cross-platform and capable of terminating the procedure. This
module use the multiprocessing module to solve both of those problems."""
################################################################################
__author__ = 'Stephen "Zero" Chappell <[email protected]>'
__date__ = '11 February 2010'
__version__ = '$Revision: 3 $'
################################################################################
import inspect
import sys
import time
import multiprocessing
################################################################################
def add_timeout(function, limit=60):
"""Add a timeout parameter to a function and return it.
It is illegal to pass anything other than a function as the first
parameter. If the limit is not given, it gets a default value equal
to one minute. The function is wrapped and returned to the caller."""
assert inspect.isfunction(function)
if limit <= 0:
raise ValueError()
return _Timeout(function, limit)
class NotReadyError(Exception): pass
################################################################################
def _target(queue, function, *args, **kwargs):
"""Run a function with arguments and return output via a queue.
This is a helper function for the Process created in _Timeout. It runs
the function with positional arguments and keyword arguments and then
returns the function's output by way of a queue. If an exception gets
raised, it is returned to _Timeout to be raised by the value property."""
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, sys.exc_info()[1]))
class _Timeout:
"""Wrap a function and add a timeout (limit) attribute to it.
Instances of this class are automatically generated by the add_timeout
function defined above. Wrapping a function allows asynchronous calls
to be made and termination of execution after a timeout has passed."""
def __init__(self, function, limit):
"""Initialize instance in preparation for being called."""
self.__limit = limit
self.__function = function
self.__timeout = time.clock()
self.__process = multiprocessing.Process()
self.__queue = multiprocessing.Queue()
def __call__(self, *args, **kwargs):
"""Execute the embedded function object asynchronously.
The function given to the constructor is transparently called and
requires that "ready" be intermittently polled. If and when it is
True, the "value" property may then be checked for returned data."""
self.cancel()
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = multiprocessing.Process(target=_target,
args=args,
kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + time.clock()
def cancel(self):
"""Terminate any possible execution of the embedded function."""
if self.__process.is_alive():
self.__process.terminate()
@property
def ready(self):
"""Read-only property indicating status of "value" property."""
if self.__queue.full():
return True
elif not self.__queue.empty():
return True
elif self.__timeout < time.clock():
self.cancel()
else:
return False
@property
def value(self):
"""Read-only property containing data returned from function."""
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
raise NotReadyError()
def __get_limit(self):
return self.__limit
def __set_limit(self, value):
if value <= 0:
raise ValueError()
self.__limit = value
limit = property(__get_limit, __set_limit,
doc="Property for controlling the value of the timeout.")
Edit: This code was written for Python 3.x and was not designed for class methods as a decoration. The multiprocessing
module was not designed to modify class instances across the process boundaries.
Solution 1:
The principal problem with your code is the overuse of the double underscore namespace conflict prevention in a class that isn't intended to be subclassed at all.
In general, self.__foo
is a code smell that should be accompanied by a comment along the lines of # This is a mixin and we don't want arbitrary subclasses to have a namespace conflict
.
Further the client API of this method would look like this:
def mymethod(): pass
mymethod = add_timeout(mymethod, 15)
# start the processing
timeout_obj = mymethod()
try:
# access the property, which is really a function call
ret = timeout_obj.value
except TimeoutError:
# handle a timeout here
ret = None
This is not very pythonic at all and a better client api would be:
@timeout(15)
def mymethod(): pass
try:
my_method()
except TimeoutError:
pass
You are using @property in your class for something that is a state mutating accessor, this is not a good idea. For instance, what would happen when .value is accessed twice? It looks like it would fail because queue.get() would return trash because the queue is already empty.
Remove @property entirely. Don't use it in this context, it's not suitable for your use-case. Make call block when called and return the value or raise the exception itself. If you really must have value accessed later, make it a method like .get() or .value().
This code for the _target should be rewritten a little:
def _target(queue, function, *args, **kwargs):
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, exc_info())) # get *all* the exec info, don't do exc_info[1]
# then later:
raise exc_info[0], exc_info[1], exc_info[2]
That way the stack trace will be preserved correctly and visible to the programmer.
I think you've made a reasonable first crack at writing a useful library, I like the usage of the processing module to achieve the goals.
Solution 2:
This is how to get the decorator syntax Jerub mentioned
def timeout(limit=None):
if limit is None:
limit = DEFAULT_TIMEOUT
if limit <= 0:
raise TimeoutError() # why not ValueError here?
def wrap(function):
return _Timeout(function,limit)
return wrap
@timeout(15)
def mymethod(): pass