[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Add a recursive read-write lock implementation; move al...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv23353/lib/mixminion

Modified Files:
	Common.py 
Added Files:
	ThreadUtils.py 
Log Message:
Add a recursive read-write lock implementation; move all threading helpers into separate module

--- NEW FILE: ThreadUtils.py ---
# Copyright 2002-2004 Nick Mathewson.  See LICENSE for licensing information.
# $Id: ThreadUtils.py,v 1.1 2004/01/08 22:33:31 nickm Exp $

"""mixminion.ThreadUtils

   Helper code for threading-related operations, including queues and 
   RW-locks.
   """

__all__ = [ 'MessageQueue', 'QueueEmpty', 'ClearableQueue', 'TimeoutQueue',
            'RWLock' ]

import threading
import time

import thread
_get_ident = thread.get_ident
del thread

#----------------------------------------------------------------------
# Queues

# Imported here so we can get it in mixminion.server without being shadowed
# by the old Queue.py file.
from Queue import Queue, Empty
MessageQueue = Queue
QueueEmpty = Empty
del Queue
del Empty

class ClearableQueue(MessageQueue):
    """Extended version of python's Queue class that supports removing
       all the items from the queue."""
    def clear(self):
        """Remove all the items from this queue."""
        # If the queue is empty, return.
        if not self.esema.acquire(0):
            return
        self.mutex.acquire()
        was_full = self._full()
        self._clear()
        assert self._empty()
        # If the queue used to be full, it isn't anymore.
        if was_full:
            self.fsema.release()
        self.mutex.release()

    def _clear(self):
        """Backend for _clear"""
        del self.queue[:]

try:
    q = MessageQueue()
    q.put(3)
    q.get(timeout=10)
    BUILTIN_QUEUE_HAS_TIMEOUT = 1
except TypeError:
    BUILTIN_QUEUE_HAS_TIMEOUT = 0
del q

if BUILTIN_QUEUE_HAS_TIMEOUT:
    TimeoutQueue = ClearableQueue
else:
    class TimeoutQueue(ClearableQueue):
        """Helper class for Python 2.2. and earlier: extends the 'get'
           functionality of Queue.Queue to support a 'timeout' argument.
           If 'block' is true and timeout is provided, wait for no more
           than 'timeout' seconds before raising QueueEmpty.

           In Python 2.3 and later, this interface is standard.
        """
        def get(self, block=1, timeout=None):
            if timeout is None or not block:
                return MessageQueue.get(self, block)

            # This logic is adapted from 'Condition' in the Python
            # threading module.
            _time = time.time
            _sleep = time.sleep
            deadline = timeout+_time()
            delay = .0005
            while 1:
                try:
                    return MessageQueue.get(self,0)
                except QueueEmpty:
                    remaining = deadline-_time()
                    if remaining <= 0:
                        raise
                    delay = min(delay*2,remaining,0.2)
                    _sleep(delay)

            raise AssertionError # unreached, appease pychecker

#----------------------------------------------------------------------
# RW locks
#
# Adapted from the msrw class in the sync.py module in the Python
# distribution's Demo/threads directory, but modified to use
# threading.Condition.

class RWLock:
    """A lock that allows multiple readers at a time, but only one writer."""
    # Changes from sync.mrsw: 
    #    *  Use threading.Condition instead of sync.condition.
    #    *  Document everything.
    #    *  Don't hold on to rwOK forever when there's an error.
    #    *  Enable recursive invocation of read_in.  Formerly, if thread A
    #       called read_in, thread B called write_in, and thread A called
    #       read_in again, the code would deadlock: the second read_in
    #       would block until write_in succeeded, and write_in would block
    #       until the first read_in was done.
    #
    #       There's a commented-out alternative implementation that makes
    #       recursive invocation an error.  But that doesn't seem to be needed.
    def __init__(self):
        # critical-section lock & the data it protects
        self.rwOK = threading.Lock()
        self.nr = 0  # number readers actively reading (not just waiting)
        self.nw = 0  # number writers either waiting to write or writing
        self.writing = 0  # 1 iff some thread is writing
        # map from each current reader's thread_ident to recursion depth.
        self.readers = {} 

        # conditions
        self.readOK  = threading.Condition(self.rwOK)  # OK to unblock readers
        self.writeOK = threading.Condition(self.rwOK)  # OK to unblock writers

    def read_in(self):
        """Acquire the lock for reading. Block while any threads are currently
           writing or waiting to write."""
        self.rwOK.acquire()
        try:
            ident = _get_ident()
            try:
                self.readers[ident] += 1
                self.nr += 1
                return
            except KeyError:
                pass
            #if self.readers.has_key(ident):
            #    raise ValueError("RWLock.read_in called recursively.")

            while self.nw:
                self.readOK.wait()
            self.nr = self.nr + 1
            self.readers[ident] = 1
        finally:
            self.rwOK.release()

    def read_out(self):
        """Release the lock for reading.  When no more readers are active,
           activate the writers (if any)."""
        self.rwOK.acquire()
        try:
            if self.nr <= 0:
                raise ValueError, '.read_out() invoked without an active reader'
            ident = _get_ident()
            try:
                n = self.readers[ident]
            except KeyError:
                raise ValueError("read_out called without matching read_in.")
            if n == 1:
                del self.readers[ident]
            else:
                self.readers[ident] = n-1
                self.nr -= 1
                return
            #try:
            #    del self.readers[_get_ident()]
            #except KeyError:
            #    raise ValueError("read_out called without matching read_in.")

            self.nr = self.nr - 1
            if self.nr == 0:
                self.writeOK.notify()
        finally:
            self.rwOK.release()

    def write_in(self):
        """Acquire the lock for writing. Block while any threads are reading
           or writing.
        """
        self.rwOK.acquire()
        try:
            if self.readers.has_key(_get_ident()):
                raise ValueError("write_in called while acting as reader")
            self.nw = self.nw + 1
            while self.writing or self.nr:
                self.writeOK.wait()
            self.writing = 1
        finally:
            self.rwOK.release()

    def write_out(self):
        """Release the lock for writing."""
        self.rwOK.acquire()
        try:
            if not self.writing:
                raise ValueError, \
                      '.write_out() invoked without an active writer'
            self.writing = 0
            self.nw = self.nw - 1
            if self.nw:
                self.writeOK.notify()
            else:
                self.readOK.notifyAll()
        finally:
            self.rwOK.release()

    def write_to_read(self):
        """Simultaneously release the lock as a writer, and become a reader."""
        self.rwOK.acquire()
        try:
            if not self.writing:
                raise ValueError, \
                      '.write_to_read() invoked without an active writer'
            self.writing = 0
            self.nw = self.nw - 1
            self.nr = self.nr + 1
            self.readers[_get_ident()] = 1
            if not self.nw:
                self.readOK.notifyAll()
        finally:
            self.rwOK.release()

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.127
retrieving revision 1.128
diff -u -d -r1.127 -r1.128
--- Common.py	3 Jan 2004 07:35:23 -0000	1.127
+++ Common.py	8 Jan 2004 22:33:31 -0000	1.128
@@ -33,13 +33,6 @@
 import threading
 import time
 import traceback
-# Imported here so we can get it in mixminion.server without being shadowed
-# by the old Queue.py file.
-from Queue import Queue, Empty
-MessageQueue = Queue
-QueueEmpty = Empty
-del Queue
-del Empty
 
 O_BINARY = getattr(os, 'O_BINARY', 0)
 
@@ -1569,69 +1562,3 @@
         _warned_no_locks = 1
         LOG.warn("Mixminion couldn't find a file locking implementation.")
         LOG.warn("  (Simultaneous accesses may lead to data corruption.")
-
-#----------------------------------------------------------------------
-# Threading operations
-
-class ClearableQueue(MessageQueue):
-    """Extended version of python's Queue class that supports removing
-       all the items from the queue."""
-    def clear(self):
-        """Remove all the items from this queue."""
-        # If the queue is empty, return.
-        if not self.esema.acquire(0):
-            return
-        self.mutex.acquire()
-        was_full = self._full()
-        self._clear()
-        assert self._empty()
-        # If the queue used to be full, it isn't anymore.
-        if was_full:
-            self.fsema.release()
-        self.mutex.release()
-
-    def _clear(self):
-        """Backend for _clear"""
-        del self.queue[:]
-
-try:
-    q = MessageQueue()
-    q.put(3)
-    q.get(timeout=10)
-    BUILTIN_QUEUE_HAS_TIMEOUT = 1
-except TypeError:
-    BUILTIN_QUEUE_HAS_TIMEOUT = 0
-del q
-
-if BUILTIN_QUEUE_HAS_TIMEOUT:
-    TimeoutQueue = ClearableQueue
-else:
-    class TimeoutQueue(ClearableQueue):
-        """Helper class for Python 2.2. and earlier: extends the 'get'
-           functionality of Queue.Queue to support a 'timeout' argument.
-           If 'block' is true and timeout is provided, wait for no more
-           than 'timeout' seconds before raising QueueEmpty.
-
-           In Python 2.3 and later, this interface is standard.
-        """
-        def get(self, block=1, timeout=None):
-            if timeout is None or not block:
-                return MessageQueue.get(self, block)
-
-            # This logic is adapted from 'Condition' in the Python
-            # threading module.
-            _time = time.time
-            _sleep = time.sleep
-            deadline = timeout+_time()
-            delay = .0005
-            while 1:
-                try:
-                    return MessageQueue.get(self,0)
-                except QueueEmpty:
-                    remaining = deadline-_time()
-                    if remaining <= 0:
-                        raise
-                    delay = min(delay*2,remaining,0.2)
-                    _sleep(delay)
-
-            raise AssertionError # unreached, appease pychecker