[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Move scheduler logic from ServerMain.py into its own mo...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria:/tmp/cvs-serv17980/lib/mixminion

Added Files:
	ScheduleUtils.py 
Log Message:
Move scheduler logic from ServerMain.py into its own module.

--- NEW FILE: ScheduleUtils.py ---
# Copyright 2002-2005 Nick Mathewson.  See LICENSE for licensing information.
# Id: ClientMain.py,v 1.89 2003/06/05 18:41:40 nickm Exp $

"""mixminion.ScheduleUtils

   Simple implementation of a block-until-it's-time-to-do-something scheduler.
   """

import threading

__all__ = [ 'ScheduledEvent', 'OneTimeEvent', 'RecurringEvent',
            'RecurringComplexEvent', 'RecurringBackgroundEvent',
            'RecurringComplexBackgroundEvent', 'Scheduler' ]

class ScheduledEvent:
    """Abstract base class for a scheduleable event."""
    def getNextTime(self):
        """Return the next time when this event should be called.  Return
           -1 for 'never' and 'None' for 'currently unknown'.
        """
        raise NotImplementedError("getNextTime")
    def __call__(self):
        """Invoke this event."""
        raise NotImplementedError("__call__")

class OneTimeEvent:
    """An event that will be called exactly once."""
    def __init__(self, when, func):
        """Create an event to call func() at the time 'when'."""
        self.when = when
        self.func = func
    def getNextTime(self):
        return self.when
    def __call__(self):
        self.func()
        self.when = -1

class RecurringEvent:
    """An event that will be called at regular intervals."""
    def __init__(self, when, func, repeat):
        """Create an event to call func() at the time 'when', and every
           'repeat' seconds thereafter."""
        self.when = when
        self.func = func
        self.repeat = repeat
    def getNextTime(self):
        return self.when
    def __call__(self):
        try:
            self.func()
        finally:
            self.when += self.repeat

class RecurringComplexEvent(RecurringEvent):
    """An event that will be called at irregular intervals."""
    def __init__(self, when, func):
        """Create an event to invoke func() at time 'when'.  func() must
           return -1 for 'do not call again', or a time when it should next
           be called."""
        RecurringEvent.__init__(self, when, func, None)
    def __call__(self):
        self.when = self.func()

class RecurringBackgroundEvent:
    """An event that will be called at regular intervals, and scheduled
       as a background job.  Does not reschedule the event while it is
       already in progress."""
    def __init__(self, when, scheduleJob, func, repeat):
        """Create an event to invoke 'func' at time 'when' and every
           'repeat' seconds thereafter.   The function 'scheduleJob' will
           be invoked with a single callable object in order to run that
           callable in the background.
        """
        self.when = when
        self.scheduleJob = scheduleJob
        self.func = func
        self.repeat = repeat
        self.running = 0
        self.lock = threading.Lock()
    def getNextTime(self):
        self.lock.acquire()
        try:
            if self.running:
                return None
            else:
                return self.when
        finally:
            self.lock.release()
    def __call__(self):
        self.lock.acquire()
        try:
            if self.running:
                return
            self.running = 1
        finally:
            self.lock.release()

        self.scheduleJob(self._background)
    def _background(self):
        """Helper function: this one is actually invoked by the background
           thread."""
        self.func()
        self.lock.acquire()
        try:
            now = time.time()
            while self.when < now:
                self.when += self.repeat
            self.running = 0
        finally:
            self.lock.release()

class RecurringComplexBackgroundEvent(RecurringBackgroundEvent):
    """An event to run a job at irregular intervals in the background."""
    def __init__(self, when, scheduleJob, func):
        """Create an event to invoke 'func' at time 'when'.  func() must
           return -1 for 'do not call again', or a time when it should next
           be called.

           The function 'scheduleJob' will be invoked with a single
           callable object in order to run that callable in the
           background.
        """
        RecurringBackgroundEvent.__init__(self, when, scheduleJob, func, None)
    def _background(self):
        next = self.func()
        self.lock.acquire()
        try:
            self.when = next
            self.running = 0
        finally:
            self.lock.release()

class Scheduler:
    """Base class: used to run a bunch of events periodically."""
    ##Fields:
    # scheduledEvents: a list of ScheduledEvent objects.
    # schedLock: a threading.RLock object to protect the list scheduledEvents
    #   (but not the events themselves).
    #XXXX008 needs more tests
    def __init__(self):
        """Create a new scheduler."""
        self.scheduledEvents = []
        self.schedLock = threading.RLock()

    def firstEventTime(self):
        """Return the time at which an event will first occur."""
        self.schedLock.acquire()
        try:
            if not self.scheduledEvents:
                return -1
            first = 0
            for e in self.scheduledEvents:
                t = e.getNextTime()
                if t in (-1,None): continue
                if not first or t < first:
                    first = t
            return first
        finally:
            self.schedLock.release()

    def scheduleEvent(self, event):
        """Add a ScheduledEvent to this scheduler"""
        when = event.getNextTime()
        if when == -1:
            return
        self.schedLock.acquire()
        try:
            self.scheduledEvents.append(event)
        finally:
            self.schedLock.acquire()

    #XXXX008 -- these are only used for testing.
    def scheduleOnce(self, when, name, cb):
        self.scheduleEvent(OneTimeEvent(when,cb))

    def scheduleRecurring(self, first, interval, name, cb):
        self.scheduleEvent(RecurringEvent(first, cb, interval))

    def scheduleRecurringComplex(self, first, name, cb):
        self.scheduleEvent(RecurringComplexEvent(first, cb))

    def processEvents(self, now=None):
        """Run all events that need to get called at the time 'now'."""
        if now is None:
            now = time.time()
        self.schedLock.acquire()
        try:
            events = [(e.getNextTime(),e) for e in self.scheduledEvents]
            self.scheduledEvents = [e for t,e in events if t != -1]
            runnable = [(t,e) for t,e in events
                        if t not in (-1,None) and t <= now]
        finally:
            self.schedLock.release()
        runnable.sort()
        for _,e in runnable:
            e()