use threading.Timer instead of the sched module

This commit is contained in:
Romain Bignon 2010-04-26 18:11:42 +02:00
commit 1795d16112
2 changed files with 27 additions and 16 deletions

View file

@ -18,34 +18,45 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
import sched
import time
from threading import Timer, Event
__all__ = ['Scheduler']
class Scheduler(object):
def __init__(self):
self.scheduler = sched.scheduler(time.time, time.sleep)
self.running = False
class IScheduler(object):
def schedule(self, interval, function, *args):
return self.scheduler.enter(interval, 1, function, args)
raise NotImplementedError()
def repeat(self, interval, function, *args):
return self.scheduler.enter(interval, 1, self._repeated_cb, (interval, function, args))
raise NotImplementedError()
def run(self):
self.running = True
while self.running:
self.scheduler.run()
if not self.scheduler.queue:
self.scheduler.delayfunc(0.001)
raise NotImplementedError()
def want_stop(self):
raise NotImplementedError()
class Scheduler(IScheduler):
def __init__(self):
self.stop_event = Event()
self.count = 0
self.queue = {}
def schedule(self, interval, function, *args):
self.count += 1
timer = Timer(interval, function, args)
timer.start()
self.queue[self.count] = timer
return self.count
def repeat(self, interval, function, *args):
return self.schedule(interval, self._repeated_cb, interval, function, args)
def run(self):
self.stop_event.wait()
return True
def want_stop(self):
self.running = False
self.stop_event.set()
def _repeated_cb(self, interval, func, args):
func(*args)

View file

@ -23,13 +23,13 @@ from PyQt4.QtCore import QTimer, SIGNAL
from PyQt4.QtGui import QMainWindow, QApplication
from weboob import Weboob
from weboob.scheduler import Scheduler
from weboob.scheduler import IScheduler
from .base import BaseApplication
__all__ = ['QtApplication']
class QtScheduler(Scheduler):
class QtScheduler(IScheduler):
def __init__(self, app):
self.app = app
self.timers = {}