Is there an easy way in Python to wait until certain condition is true?

Solution 1:

Unfortunately the only possibility to meet your constraints is to periodically poll, e.g....:

import time

def wait_until(somepredicate, timeout, period=0.25, *args, **kwargs):
  mustend = time.time() + timeout
  while time.time() < mustend:
    if somepredicate(*args, **kwargs): return True
    time.sleep(period)
  return False

or the like. This can be optimized in several ways if somepredicate can be decomposed (e.g. if it's known to be an and of several clauses, especially if some of the clauses are in turn subject to optimization by being detectable via threading.Events or whatever, etc, etc), but in the general terms you ask for, this inefficient approach is the only way out.

Solution 2:

Another nice package is waiting - https://pypi.org/project/waiting/

install:

pip install waiting

Usage: You pass a function that will be called every time as a condition, a timeout, and (this is useful) you can pass a description for the waiting, which will be displayed if you get TimeoutError.

using function:

from waiting import wait


def is_something_ready(something):
    if something.ready():
        return True
    return False


# wait for something to be ready
something = # whatever

wait(lambda: is_something_ready(something), timeout_seconds=120, waiting_for="something to be ready")

# this code will only execute after "something" is ready
print("Done")

Note: the function must return a boolean - True when the wait is over, False otherwise

Solution 3:

Here is another solution. The goal was to make threads to wait on each other before doing some work in a very precise order. The work can take unknown amount of time. Constant polling is not good for two reasons: it eats CPU time and action does not start immediately after condition is met.

class Waiter():

    def __init__(self, init_value):
        self.var = init_value
        self.var_mutex = threading.Lock()
        self.var_event = threading.Event()

    def WaitUntil(self, v):
        while True:
            self.var_mutex.acquire()
            if self.var == v:
                self.var_mutex.release()
                return # Done waiting
            self.var_mutex.release()
            self.var_event.wait(1) # Wait 1 sec

    def Set(self, v):
        self.var_mutex.acquire()
        self.var = v
        self.var_mutex.release()
        self.var_event.set() # In case someone is waiting
        self.var_event.clear()

And the way to test it

class TestWaiter():

    def __init__(self):
        self.waiter = Waiter(0)
        threading.Thread(name='Thread0', target=self.Thread0).start()
        threading.Thread(name='Thread1', target=self.Thread1).start()
        threading.Thread(name='Thread2', target=self.Thread2).start()

    def Thread0(self):
        while True:
            self.waiter.WaitUntil(0)
            # Do some work
            time.sleep(np.random.rand()*2)
            self.waiter.Set(1)

    def Thread1(self):
        while True:
            self.waiter.WaitUntil(1)
            # Do some work
            time.sleep(np.random.rand())
            self.waiter.Set(2)

    def Thread2(self):
        while True:
            self.waiter.WaitUntil(2)
            # Do some work
            time.sleep(np.random.rand()/10)
            self.waiter.Set(0)

Waiter for multiprocessing:

import multiprocessing as mp
import ctypes

class WaiterMP():
    def __init__(self, init_value, stop_value=-1):
        self.var = mp.Value(ctypes.c_int, init_value)
        self.stop_value = stop_value
        self.event = mp.Event()

    def Terminate(self):
        self.Set(self.stop_value)

    def Restart(self):
        self.var.value = self.init_value

    def WaitUntil(self, v):
        while True:
            if self.var.value == v or self.var.value == self.stop_value:
                return
            # Wait 1 sec and check aiagn (in case event was missed)
            self.event.wait(1)

    def Set(self, v):
        exit = self.var.value == self.stop_value
        if not exit: # Do not set var if threads are exiting
            self.var.value = v
        self.event.set() # In case someone is waiting
        self.event.clear()

Please comment if this is still not the best solution.

Solution 4:

You've basically answered your own question: no.

Since you're dealing with external libraries in boost.python, which may change objects at their leisure, you need to either have those routines call an event handler refresh, or work with a condition.