Python : Any way to get one process to have a write lock and others to just read on parallel?
I found these python version of the reader writer lock
A reader-writer lock for Python
Reader-Writer lock with priority for writers (Python recipe)
One more implementation of the ReadWriteLock, takes care of the writer starvation issue and supports promotion of read lock to write lock, if requested for during construction. It utilizes only one Lock and a Condition.
# From O'Reilly Python Cookbook by David Ascher, Alex Martelli
# With changes to cover the starvation situation where a continuous
# stream of readers may starve a writer, Lock Promotion and Context Managers
class ReadWriteLock:
""" A lock object that allows many simultaneous "read locks", but
only one "write lock." """
def __init__(self, withPromotion=False):
self._read_ready = threading.Condition(threading.RLock( ))
self._readers = 0
self._writers = 0
self._promote = withPromotion
self._readerList = [] # List of Reader thread IDs
self._writerList = [] # List of Writer thread IDs
def acquire_read(self):
logging.debug("RWL : acquire_read()")
""" Acquire a read lock. Blocks only if a thread has
acquired the write lock. """
self._read_ready.acquire( )
try:
while self._writers > 0:
self._read_ready.wait()
self._readers += 1
finally:
self._readerList.append(threading.get_ident())
self._read_ready.release( )
def release_read(self):
logging.debug("RWL : release_read()")
""" Release a read lock. """
self._read_ready.acquire( )
try:
self._readers -= 1
if not self._readers:
self._read_ready.notifyAll( )
finally:
self._readerList.remove(threading.get_ident())
self._read_ready.release( )
def acquire_write(self):
logging.debug("RWL : acquire_write()")
""" Acquire a write lock. Blocks until there are no
acquired read or write locks. """
self._read_ready.acquire( ) # A re-entrant lock lets a thread re-acquire the lock
self._writers += 1
self._writerList.append(threading.get_ident())
while self._readers > 0:
# promote to write lock, only if all the readers are trying to promote to writer
# If there are other reader threads, then wait till they complete reading
if self._promote and threading.get_ident() in self._readerList and set(self._readerList).issubset(set(self._writerList)):
break
else:
self._read_ready.wait( )
def release_write(self):
logging.debug("RWL : release_write()")
""" Release a write lock. """
self._writers -= 1
self._writerList.remove(threading.get_ident())
self._read_ready.notifyAll( )
self._read_ready.release( )
#----------------------------------------------------------------------------------------------------------
class ReadRWLock:
# Context Manager class for ReadWriteLock
def __init__(self, rwLock):
self.rwLock = rwLock
def __enter__(self):
self.rwLock.acquire_read()
return self # Not mandatory, but returning to be safe
def __exit__(self, exc_type, exc_value, traceback):
self.rwLock.release_read()
return False # Raise the exception, if exited due to an exception
#----------------------------------------------------------------------------------------------------------
class WriteRWLock:
# Context Manager class for ReadWriteLock
def __init__(self, rwLock):
self.rwLock = rwLock
def __enter__(self):
self.rwLock.acquire_write()
return self # Not mandatory, but returning to be safe
def __exit__(self, exc_type, exc_value, traceback):
self.rwLock.release_write()
return False # Raise the exception, if exited due to an exception
#----------------------------------------------------------------------------------------------------------
Just another rwlock I've rolled for own use. Although I haven't used this in heavily sieged production code it worked for my use cases and it has been written by mirroring the code of a C++ class that has been used/tested in a linux server for about a year. It doesn't have an upgrade-from-read-to-write method...
import threading
class RWLock:
""" Non-reentrant write-preferring rwlock. """
DEBUG = 0
def __init__(self):
self.lock = threading.Lock()
self.active_writer_lock = threading.Lock()
# The total number of writers including the active writer and
# those blocking on active_writer_lock or readers_finished_cond.
self.writer_count = 0
# Number of events that are blocking on writers_finished_cond.
self.waiting_reader_count = 0
# Number of events currently using the resource.
self.active_reader_count = 0
self.readers_finished_cond = threading.Condition(self.lock)
self.writers_finished_cond = threading.Condition(self.lock)
class _ReadAccess:
def __init__(self, rwlock):
self.rwlock = rwlock
def __enter__(self):
self.rwlock.acquire_read()
return self.rwlock
def __exit__(self, type, value, tb):
self.rwlock.release_read()
# support for the with statement
self.read_access = _ReadAccess(self)
class _WriteAccess:
def __init__(self, rwlock):
self.rwlock = rwlock
def __enter__(self):
self.rwlock.acquire_write()
return self.rwlock
def __exit__(self, type, value, tb):
self.rwlock.release_write()
# support for the with statement
self.write_access = _WriteAccess(self)
if self.DEBUG:
self.active_readers = set()
self.active_writer = None
def acquire_read(self):
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me not in self.active_readers, 'This thread has already acquired read access and this lock isn\'t reader-reentrant!'
assert me != self.active_writer, 'This thread already has write access, release that before acquiring read access!'
self.active_readers.add(me)
if self.writer_count:
self.waiting_reader_count += 1
self.writers_finished_cond.wait()
# Even if the last writer thread notifies us it can happen that a new
# incoming writer thread acquires the lock earlier than this reader
# thread so we test for the writer_count after each wait()...
# We also protect ourselves from spurious wakeups that happen with some POSIX libraries.
while self.writer_count:
self.writers_finished_cond.wait()
self.waiting_reader_count -= 1
self.active_reader_count += 1
def release_read(self):
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me in self.active_readers, 'Trying to release read access when it hasn\'t been acquired by this thread!'
self.active_readers.remove(me)
assert self.active_reader_count > 0
self.active_reader_count -= 1
if not self.active_reader_count and self.writer_count:
self.readers_finished_cond.notifyAll()
def acquire_write(self):
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me not in self.active_readers, 'This thread already has read access - release that before acquiring write access!'
assert me != self.active_writer, 'This thread already has write access and this lock isn\'t writer-reentrant!'
self.writer_count += 1
if self.active_reader_count:
self.readers_finished_cond.wait()
while self.active_reader_count:
self.readers_finished_cond.wait()
self.active_writer_lock.acquire()
if self.DEBUG:
self.active_writer = me
def release_write(self):
if not self.DEBUG:
self.active_writer_lock.release()
with self.lock:
if self.DEBUG:
me = threading.currentThread()
assert me == self.active_writer, 'Trying to release write access when it hasn\'t been acquired by this thread!'
self.active_writer = None
self.active_writer_lock.release()
assert self.writer_count > 0
self.writer_count -= 1
if not self.writer_count and self.waiting_reader_count:
self.writers_finished_cond.notifyAll()
def get_state(self):
with self.lock:
return (self.writer_count, self.waiting_reader_count, self.active_reader_count)
if __name__ == '__main__':
import time, sys
lock = RWLock()
start_time = time.time()
print_lock = threading.Lock()
def p(msg):
with print_lock:
print '%5.2f [%2s] %-15s' % (time.time()-start_time, threading.currentThread().myid, msg)
def p_state(msg):
with print_lock:
print '%5.2f [%2s] %-15s writer_count=%s waiting_reader_count=%s active_reader_count=%s' % \
((time.time()-start_time, threading.currentThread().myid, msg) + lock.get_state())
def w():
p('write wait...')
with lock.write_access:
p_state('write started.')
time.sleep(threading.currentThread().mytimeout)
p_state('write ended.')
def r():
p('read wait...')
with lock.read_access:
p_state('read started.')
time.sleep(threading.currentThread().mytimeout)
p_state('read ended.')
def start_thread(id, func, timeout):
thread = threading.Thread(target=func)
thread.myid = id
thread.mytimeout = timeout
thread.start()
return thread
TEST_LOCKS = [
# (id, start_time, duration, r/w)
# Testing the branches of acquire_read() and release_read()
(1, 0, 1, r),
(2, 0.1, 0.5, r),
(-1, 2, 0, 0),
(3, 2, 0.5, w),
(4, 2.1, 0.5, w),
(5, 2.1, 1, r),
(6, 2.1, 1, r),
(7, 2.2, 0.1, w),
(-1, 5, 0, 0),
(8, 5, 0.5, r),
(9, 5.1, 0.5, w),
(10, 5.1, 0.5, w),
# Testing the branches of acquire_write() and release_write()
(-1, 8, 0, 0),
(11, 8, 1, w),
(12, 8.1, 0.5, w),
(-1, 10, 0, 0),
(13, 10, 0.5, r),
(14, 10.1, 0.5, w),
(15, 10.1, 0.5, r),
(16, 10.2, 0.5, r),
(17, 10.3, 0.5, w),
]
threading.currentThread().myid = 0
t = 0
for id, start, duration, rw in sorted(TEST_LOCKS, key=lambda x:x[1]):
time.sleep(start - t)
t = start
if id < 0:
p('-----------------------------')
else:
start_thread(id, rw, duration)
If you use django you might want to use this Read-Write-Lock: django.utils.synch.RWLock