threads: errors management

This commit is contained in:
Romain Bignon 2010-04-29 11:08:08 +02:00
commit ff6c3f79ae
3 changed files with 56 additions and 25 deletions

View file

@ -18,4 +18,4 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
""" """
from .ouiboube import Weboob from .ouiboube import Weboob, CallErrors

View file

@ -21,9 +21,15 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from __future__ import with_statement from __future__ import with_statement
from logging import debug from logging import debug
from copy import copy
from threading import Thread, Event, RLock, Timer from threading import Thread, Event, RLock, Timer
__all__ = ['BackendsCall'] __all__ = ['BackendsCall', 'CallErrors']
class CallErrors(Exception):
def __init__(self, errors):
Exception.__init__(self, "Several errors have been raised:\n%s" % ('\n'.join(['%s: %s' % (b, e) for b, e in errors])))
self.errors = copy(errors)
class Result(object): class Result(object):
def __init__(self, backend, result): def __init__(self, backend, result):
@ -54,7 +60,9 @@ class BackendsCall(object):
self.response_event = Event() self.response_event = Event()
# Waiting responses # Waiting responses
self.responses = [] self.responses = []
# Timers # Errors
self.errors = []
# Threads
self.threads = [] self.threads = []
# Create jobs for each backend # Create jobs for each backend
@ -68,21 +76,30 @@ class BackendsCall(object):
with b: with b:
try: try:
# Call method on backend # Call method on backend
r = getattr(b, function)(*args, **kwargs) try:
debug('%s: Got answer! %s' % (b, r)) if callable(function):
r = function(b, *args, **kwargs)
if hasattr(r, '__iter__'): else:
# Loop on iterator r = getattr(b, function)(*args, **kwargs)
for e in r: except Exception, e:
# Lock mutex only in loop in case the iterator is slow
# (for example if backend do some parsing operations)
with self.mutex:
self.responses.append((b,e))
self.response_event.set()
else:
with self.mutex: with self.mutex:
self.responses.append((b,r)) # TODO save backtrace and/or print it here (with debug)
self.response_event.set() self.errors.append((b, e))
else:
debug('%s: Got answer! %s' % (b, r))
if hasattr(r, '__iter__'):
# Loop on iterator
for e in r:
# Lock mutex only in loop in case the iterator is slow
# (for example if backend do some parsing operations)
with self.mutex:
self.responses.append((b,e))
self.response_event.set()
else:
with self.mutex:
self.responses.append((b,r))
self.response_event.set()
finally: finally:
with self.mutex: with self.mutex:
# This backend is now finished # This backend is now finished
@ -93,7 +110,7 @@ class BackendsCall(object):
self.finish_event.set() self.finish_event.set()
self.response_event.set() self.response_event.set()
def _callback_thread_run(self, function): def _callback_thread_run(self, callback, errback):
responses = [] responses = []
while not self.finish_event.isSet(): while not self.finish_event.isSet():
self.response_event.wait() self.response_event.wait()
@ -106,9 +123,14 @@ class BackendsCall(object):
# Consume responses # Consume responses
while responses: while responses:
function(*responses.pop()) callback(*responses.pop())
def callback_thread(self, function): if errback:
with self.mutex:
while self.errors:
errback(*self.errors.pop())
def callback_thread(self, callback, errback=None):
""" """
Call this method to create a thread which will callback a Call this method to create a thread which will callback a
specified function everytimes a new result comes. specified function everytimes a new result comes.
@ -116,11 +138,12 @@ class BackendsCall(object):
When the process is over, the function will be called with When the process is over, the function will be called with
both arguments set to None. both arguments set to None.
The function prototype is: The functions prototypes:
def function(backend, result) def callback(backend, result)
def errback(backend, error)
""" """
return Thread(target=self._callback_thread_run, args=(function,)) return Thread(target=self._callback_thread_run, args=(callback, errback))
def __iter__(self): def __iter__(self):
# Don't know how to factorize with _callback_thread_run # Don't know how to factorize with _callback_thread_run
@ -137,3 +160,8 @@ class BackendsCall(object):
# Consume responses # Consume responses
while responses: while responses:
yield Result(*responses.pop()) yield Result(*responses.pop())
# Raise errors
with self.mutex:
if self.errors:
raise CallErrors(self.errors)

View file

@ -18,15 +18,17 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
""" """
from __future__ import with_statement
import os import os
from logging import warning from logging import warning
from weboob.bcall import BackendsCall from weboob.bcall import BackendsCall, CallErrors
from weboob.modules import ModulesLoader, BackendsConfig from weboob.modules import ModulesLoader, BackendsConfig
from weboob.scheduler import Scheduler from weboob.scheduler import Scheduler
__all__ = ['Weboob'] __all__ = ['Weboob', 'CallErrors']
class Weboob(object): class Weboob(object):
@ -93,7 +95,8 @@ class Weboob(object):
def iter_backends(self, caps=None): def iter_backends(self, caps=None):
for name, backend in self.backends.iteritems(): for name, backend in self.backends.iteritems():
if caps is None or backend.has_caps(caps): if caps is None or backend.has_caps(caps):
yield backend with backend:
yield backend
def do(self, function, *args, **kwargs): def do(self, function, *args, **kwargs):
backends = [b for b in self.iter_backends()] backends = [b for b in self.iter_backends()]