From 1795d161128c54a9946de23565626d8079e102da Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Mon, 26 Apr 2010 18:11:42 +0200 Subject: [PATCH 01/12] use threading.Timer instead of the sched module --- weboob/scheduler.py | 45 +++++++++++++++++++++------------- weboob/tools/application/qt.py | 4 +-- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/weboob/scheduler.py b/weboob/scheduler.py index 6ae54af6..c8791fb6 100644 --- a/weboob/scheduler.py +++ b/weboob/scheduler.py @@ -18,34 +18,45 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -import sched -import time - +from threading import Timer, Event __all__ = ['Scheduler'] - -class Scheduler(object): - def __init__(self): - self.scheduler = sched.scheduler(time.time, time.sleep) - self.running = False - +class IScheduler(object): def schedule(self, interval, function, *args): - return self.scheduler.enter(interval, 1, function, args) + raise NotImplementedError() def repeat(self, interval, function, *args): - return self.scheduler.enter(interval, 1, self._repeated_cb, (interval, function, args)) + raise NotImplementedError() def run(self): - self.running = True - while self.running: - self.scheduler.run() - if not self.scheduler.queue: - self.scheduler.delayfunc(0.001) + raise NotImplementedError() + + def want_stop(self): + raise NotImplementedError() + +class Scheduler(IScheduler): + def __init__(self): + self.stop_event = Event() + self.count = 0 + self.queue = {} + + def schedule(self, interval, function, *args): + self.count += 1 + timer = Timer(interval, function, args) + timer.start() + self.queue[self.count] = timer + return self.count + + def repeat(self, interval, function, *args): + return self.schedule(interval, self._repeated_cb, interval, function, args) + + def run(self): + self.stop_event.wait() return True def want_stop(self): - self.running = False + self.stop_event.set() def _repeated_cb(self, interval, func, args): func(*args) diff --git a/weboob/tools/application/qt.py b/weboob/tools/application/qt.py index 2d4cc93a..f5230eda 100644 --- a/weboob/tools/application/qt.py +++ b/weboob/tools/application/qt.py @@ -23,13 +23,13 @@ from PyQt4.QtCore import QTimer, SIGNAL from PyQt4.QtGui import QMainWindow, QApplication from weboob import Weboob -from weboob.scheduler import Scheduler +from weboob.scheduler import IScheduler from .base import BaseApplication __all__ = ['QtApplication'] -class QtScheduler(Scheduler): +class QtScheduler(IScheduler): def __init__(self, app): self.app = app self.timers = {} From 31433c161090d1c4168524c038300c53a3cd34be Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Mon, 26 Apr 2010 18:12:53 +0200 Subject: [PATCH 02/12] inherit the Scheduler class' threads based event loop --- weboob/tools/application/prompt.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/weboob/tools/application/prompt.py b/weboob/tools/application/prompt.py index db8096a4..8bf3ee8b 100644 --- a/weboob/tools/application/prompt.py +++ b/weboob/tools/application/prompt.py @@ -18,9 +18,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -import sched -import time -import select import sys from weboob import Weboob @@ -34,18 +31,18 @@ __all__ = ['PromptApplication'] class PromptScheduler(Scheduler): def __init__(self, prompt_cb, read_cb): - self.scheduler = sched.scheduler(time.time, self.sleep) + Scheduler.__init__(self) self.read_cb = read_cb self.prompt_cb = prompt_cb - def sleep(self, d): - self.prompt_cb() + def run(self): try: - read, write, excepts = select.select([sys.stdin], [], [], d or None) - if read: + while not self.stop_event.isSet(): + self.prompt_cb() line = sys.stdin.readline() if not line: self.want_stop() + sys.stdout.write('\n') else: self.read_cb(line.strip()) except KeyboardInterrupt: From 680e149a41a66bd65c8a0dd4f5fd226fc6f92fbd Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Mon, 26 Apr 2010 20:09:58 +0200 Subject: [PATCH 03/12] command 'help' --- weboob/tools/application/prompt.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/weboob/tools/application/prompt.py b/weboob/tools/application/prompt.py index 8bf3ee8b..85bbace3 100644 --- a/weboob/tools/application/prompt.py +++ b/weboob/tools/application/prompt.py @@ -52,6 +52,13 @@ class PromptApplication(ConsoleApplication): def create_weboob(self): return Weboob(self.APPNAME, scheduler=PromptScheduler(self.prompt, self.read_cb)) + @ConsoleApplication.command("Display this notice") + def command_help(self): + print 'Available commands:' + for name, arguments, doc_string in self._commands: + command = '%s %s' % (name, arguments) + print ' %-30s %s' % (command, doc_string) + def prompt(self): sys.stdout.write('> ') sys.stdout.flush() @@ -61,4 +68,5 @@ class PromptApplication(ConsoleApplication): def read_cb(self, line): line = line.split() - self.process_command(*line) + if line: + self.process_command(*line) From b848347b7cf580fa434f56732c46c690351d434c Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Mon, 26 Apr 2010 20:10:22 +0200 Subject: [PATCH 04/12] factorisation: add a 'services' system --- weboob/frontends/havesex/application.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/weboob/frontends/havesex/application.py b/weboob/frontends/havesex/application.py index 3afc34d7..76681781 100644 --- a/weboob/frontends/havesex/application.py +++ b/weboob/frontends/havesex/application.py @@ -18,6 +18,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +import sys from weboob.tools.application import PromptApplication from weboob.capabilities.dating import ICapDating @@ -59,7 +60,19 @@ class HaveSex(PromptApplication): print profile.get_profile_text() return True - @PromptApplication.command("start profiles walker") - def command_walker(self): + def service(self, action, function): + sys.stdout.write('%s:' % action) for backend in self.weboob.iter_backends(): - backend.start_profiles_walker() + sys.stdout.write(' ' + backend.name) + sys.stdout.flush() + getattr(backend, function)() + sys.stdout.write('.\n') + + @PromptApplication.command("start profiles walker") + def command_walker(self, action): + if action == 'start': + self.service('Starting walker', 'start_profiles_walker') + elif action == 'stop': + self.service('Stopping walker', 'stop_profiles_walker') + else: + print >>sys.stderr, 'Syntax: walker (start|stop)' From c04ec720862cf52348f029584d11a8a47936a3f7 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Mon, 26 Apr 2010 20:11:22 +0200 Subject: [PATCH 05/12] fix viewed profiles storing --- weboob/backends/aum/optim/profiles_walker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weboob/backends/aum/optim/profiles_walker.py b/weboob/backends/aum/optim/profiles_walker.py index 464421a6..19916044 100644 --- a/weboob/backends/aum/optim/profiles_walker.py +++ b/weboob/backends/aum/optim/profiles_walker.py @@ -34,7 +34,7 @@ class ProfilesWalker(object): self.view_cron = sched.schedule(randint(10,40), self.view_profile) def save(self): - self.storage.set('profiles_walker', 'viewed', self.visited_profiles) + self.storage.set('profiles_walker', 'viewed', list(self.visited_profiles)) self.storage.save() def stop(self): From ad4fc17a894c9e65cbe663b5955877688c332b95 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 00:59:13 +0200 Subject: [PATCH 06/12] new BackendsCall class to do asynchronous calls to backends methods --- weboob/bcall.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 weboob/bcall.py diff --git a/weboob/bcall.py b/weboob/bcall.py new file mode 100644 index 00000000..27a2afd0 --- /dev/null +++ b/weboob/bcall.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- + +""" +Copyright(C) 2010 Romain Bignon + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +""" + +from __future__ import with_statement + +from logging import debug +from threading import Thread, Event, RLock, Timer + +__all__ = ['BackendsCall'] + +class Result(object): + def __init__(self, backend, result): + self.backend = backend + self.result = result + + def __iter__(self): + """ + To allow unpack. + + For example: + >>> for backend, result in self.weboob.do(blabla) + """ + yield self.backend + yield self.result + +class BackendsCall(object): + def __init__(self, backends, function, *args, **kwargs): + # Store if a backend is finished + self.backends = {} + for backend in backends: + self.backends[backend.name] = False + # Global mutex on object + self.mutex = RLock() + # Event set when every backends have give their data + self.finish_event = Event() + # Event set when there are new responses + self.response_event = Event() + # Waiting responses + self.responses = [] + # Timers + self.threads = [] + + # Create jobs for each backend + with self.mutex: + for b in backends: + debug('New timer for %s' % b) + self.threads.append(Timer(0, self.caller, (b, function, args, kwargs)).start()) + + def caller(self, b, function, args, kwargs): + debug('Hello from timer %s' % b) + with b: + try: + # Call method on backend + r = getattr(b, function)(*args, **kwargs) + debug('%s: Got answer! %s' % (b, r)) + + if hasattr(r, '__iter__'): + # Loop on iterator + for e in r: + # Lock mutex only in loop in case the iterator is slow + # (for example if backend do some parsing operations) + with self.mutex: + self.responses.append((b,e)) + self.response_event.set() + else: + with self.mutex: + self.responses.append((b,r)) + self.response_event.set() + finally: + with self.mutex: + # This backend is now finished + self.backends[b.name] = True + for finished in self.backends.itervalues(): + if not finished: + return + self.finish_event.set() + self.response_event.set() + + def _callback_thread_run(self, function): + responses = [] + while not self.finish_event.isSet(): + self.response_event.wait() + with self.mutex: + responses = self.responses + self.responses = [] + + # Reset event + self.response_event.clear() + + # Consume responses + while responses: + function(*responses.pop()) + + def callback_thread(self, function): + """ + Call this method to create a thread which will callback a + specified function everytimes a new result comes. + + When the process is over, the function will be called with + both arguments set to None. + + The function prototype is: + def function(backend, result) + + """ + return Thread(target=self._callback_thread_run, args=(function,)) + + def __iter__(self): + # Don't know how to factorize with _callback_thread_run + responses = [] + while not self.finish_event.isSet(): + self.response_event.wait() + with self.mutex: + responses = self.responses + self.responses = [] + + # Reset event + self.response_event.clear() + + # Consume responses + while responses: + yield Result(*responses.pop()) From adb77fc58625f63d347f03a4ca99727690b8fedf Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 00:59:31 +0200 Subject: [PATCH 07/12] new methods Weboob.do() and Weboob.do_caps() to do asynchronous calls --- weboob/ouiboube.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/weboob/ouiboube.py b/weboob/ouiboube.py index 473f1b30..66a89454 100644 --- a/weboob/ouiboube.py +++ b/weboob/ouiboube.py @@ -21,6 +21,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. import os from logging import warning +from weboob.bcall import BackendsCall from weboob.modules import ModulesLoader, BackendsConfig from weboob.scheduler import Scheduler @@ -94,6 +95,14 @@ class Weboob(object): if caps is None or backend.has_caps(caps): yield backend + def do(self, function, *args, **kwargs): + backends = [b for b in self.iter_backends()] + return BackendsCall(backends, function, *args, **kwargs) + + def do_caps(self, caps, function, *args, **kwargs): + backends = [b for b in self.iter_backends(caps)] + return BackendsCall(backends, function, *args, **kwargs) + def schedule(self, interval, function, *args): return self.scheduler.schedule(interval, function, *args) From 8b822dcd0917f5206886df3bb7ced423a21ffd1e Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 00:59:59 +0200 Subject: [PATCH 08/12] BaseBackend has now a mutex and can be used in a 'with' statement to lock it --- weboob/backend.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/weboob/backend.py b/weboob/backend.py index 46ff9a0c..84e7e4e3 100644 --- a/weboob/backend.py +++ b/weboob/backend.py @@ -19,6 +19,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import re +from threading import RLock __all__ = ['BackendStorage', 'BaseBackend'] @@ -75,9 +76,19 @@ class BaseBackend(object): class ConfigError(Exception): pass + def __enter__(self): + self.lock.acquire() + + def __exit__(self, t, v, tb): + self.lock.release() + + def __repr__(self): + return u"" % self.name + def __init__(self, weboob, name, config, storage): self.weboob = weboob self.name = name + self.lock = RLock() self.config = {} for name, field in self.CONFIG.iteritems(): value = config.get(name, field.default) From 2650c32b249b0712911c5e084d9645c344e08998 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 01:14:22 +0200 Subject: [PATCH 09/12] lock on Browser (snif) --- weboob/tools/browser.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/weboob/tools/browser.py b/weboob/tools/browser.py index 1e59b3ef..a09abead 100644 --- a/weboob/tools/browser.py +++ b/weboob/tools/browser.py @@ -25,6 +25,7 @@ import re import time from logging import warning, error, debug from copy import copy +from threading import RLock from weboob.tools.parsers import get_parser @@ -160,12 +161,19 @@ class BaseBrowser(mechanize.Browser): self.last_update = 0.0 self.username = username self.password = password + self.lock = RLock() if self.password: try: self.home() except BrowserUnavailable: pass + def __enter__(self): + self.lock.acquire() + + def __exit__(self, t, v, tb): + self.lock.release() + def pageaccess(func): def inner(self, *args, **kwargs): if not self.page or self.password and not self.page.is_logged(): From ad240e7aed750b977ec65c889370a659401b8bb1 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 01:14:34 +0200 Subject: [PATCH 10/12] use lock on Browser --- weboob/backends/aum/backend.py | 49 +++++++++++--------- weboob/backends/aum/optim/profiles_walker.py | 8 +++- 2 files changed, 33 insertions(+), 24 deletions(-) diff --git a/weboob/backends/aum/backend.py b/weboob/backends/aum/backend.py index 922b4a61..3bf1a8bf 100644 --- a/weboob/backends/aum/backend.py +++ b/weboob/backends/aum/backend.py @@ -18,6 +18,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +from __future__ import with_statement + from weboob.backend import BaseBackend from weboob.capabilities.messages import ICapMessages, ICapMessagesReply from weboob.capabilities.dating import ICapDating @@ -67,38 +69,41 @@ class AuMBackend(BaseBackend, ICapMessages, ICapMessagesReply, ICapDating): yield message def _iter_messages(self, thread, only_new): - try: - if not only_new or self.browser.nb_new_mails(): - my_name = self.browser.get_my_name() - contacts = self.browser.get_contact_list() - contacts.reverse() + with self.browser: + try: + if not only_new or self.browser.nb_new_mails(): + my_name = self.browser.get_my_name() + contacts = self.browser.get_contact_list() + contacts.reverse() - for contact in contacts: - if only_new and not contact.is_new() or thread and int(thread) != contact.get_id(): - continue + for contact in contacts: + if only_new and not contact.is_new() or thread and int(thread) != contact.get_id(): + continue - mails = self.browser.get_thread_mails(contact.get_id()) - profile = None - for i in xrange(len(mails)): - mail = mails[i] - if only_new and mail.get_from() == my_name: - break + mails = self.browser.get_thread_mails(contact.get_id()) + profile = None + for i in xrange(len(mails)): + mail = mails[i] + if only_new and mail.get_from() == my_name: + break - if not profile: - profile = self.browser.get_profile(contact.get_id()) - mail.signature += u'\n%s' % profile.get_profile_text() - yield mail - except BrowserUnavailable: - pass + if not profile: + profile = self.browser.get_profile(contact.get_id()) + mail.signature += u'\n%s' % profile.get_profile_text() + yield mail + except BrowserUnavailable: + pass def post_reply(self, thread_id, reply_id, title, message): for message in self._iter_messages(thread_id, True): self.queue_messages.append(message) - return self.browser.post(thread_id, message) + with self.browser: + return self.browser.post(thread_id, message) def get_profile(self, _id): try: - return self.browser.get_profile(_id) + with self.browser: + return self.browser.get_profile(_id) except BrowserUnavailable: return None diff --git a/weboob/backends/aum/optim/profiles_walker.py b/weboob/backends/aum/optim/profiles_walker.py index 19916044..c01b0a0b 100644 --- a/weboob/backends/aum/optim/profiles_walker.py +++ b/weboob/backends/aum/optim/profiles_walker.py @@ -18,6 +18,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +from __future__ import with_statement + from logging import debug from random import randint from weboob.tools.browser import BrowserUnavailable @@ -42,7 +44,8 @@ class ProfilesWalker(object): self.event = None def walk(self): - self.profiles_queue = self.profiles_queue.union(self.browser.search_profiles()).difference(self.visited_profiles) + with self.browser: + self.profiles_queue = self.profiles_queue.union(self.browser.search_profiles()).difference(self.visited_profiles) self.save() def view_profile(self): @@ -53,7 +56,8 @@ class ProfilesWalker(object): return # empty queue try: - profile = self.browser.get_profile(id) + with self.browser: + profile = self.browser.get_profile(id) debug(u'Visited %s (%s)' % (profile.get_name(), id)) # Get score from the aum_score module From 7c502ca236a13f351f8abe6786458bc943f00ad7 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 01:17:14 +0200 Subject: [PATCH 11/12] use Weboob.do insteal of deprecated iter_backends in some frontends --- scripts/monboob | 5 ++--- weboob/frontends/videoob/application.py | 17 ++++++----------- weboob/tools/application/base.py | 1 + 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/scripts/monboob b/scripts/monboob index 50d142bc..da4ef50c 100755 --- a/scripts/monboob +++ b/scripts/monboob @@ -106,9 +106,8 @@ class Monboob(ConsoleApplication): self.weboob.loop() def process(self): - for backend in self.weboob.iter_backends(): - for message in backend.iter_new_messages(): - self.send_email(backend, message) + for backend, message in self.weboob.do('iter_new_messages'): + self.send_email(backend, message) def send_email(self, backend, mail): domain = self.config.get('domain') diff --git a/weboob/frontends/videoob/application.py b/weboob/frontends/videoob/application.py index a0859061..9da77087 100644 --- a/weboob/frontends/videoob/application.py +++ b/weboob/frontends/videoob/application.py @@ -66,22 +66,17 @@ class Videoob(ConsoleApplication): else: results['BEFORE'] = u'Last videos' results['HEADER'] = ('ID', 'Title', 'Duration') - for backend in self.weboob.iter_backends(): + for backend, video in self.weboob.do('iter_search_results', pattern): + row = (video.id, video.title, '%d:%02d:%02d' % (video.duration/3600, (video.duration%3600/60), video.duration%60)) try: - iterator = backend.iter_search_results(pattern) - except NotImplementedError: - continue - else: - rows = [] - for video in iterator: - rows.append((video.id, video.title, '%d:%02d:%02d' % (video.duration/3600, (video.duration%3600/60), video.duration%60))) - results[backend.name] = rows + results[backend.name].append(row) + except KeyError: + results[backend.name] = [row] return results @ConsoleApplication.command('Get video file URL from page URL') def command_file_url(self, url): - for backend in self.weboob.iter_backends(): - video = backend.get_video(url) + for backend, video in self.weboob.do('get_video', url): if video: print video.url break diff --git a/weboob/tools/application/base.py b/weboob/tools/application/base.py index 843138bc..ca89d58e 100644 --- a/weboob/tools/application/base.py +++ b/weboob/tools/application/base.py @@ -162,3 +162,4 @@ class BaseApplication(object): print 'Program killed by SIGINT' except ConfigError, e: print 'Configuration error: %s' % e + sys.exit(1) From ff6c3f79aea4a2316e07ed2b9656e9d8740c83f0 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Thu, 29 Apr 2010 11:08:08 +0200 Subject: [PATCH 12/12] threads: errors management --- weboob/__init__.py | 2 +- weboob/bcall.py | 72 ++++++++++++++++++++++++++++++++-------------- weboob/ouiboube.py | 9 ++++-- 3 files changed, 57 insertions(+), 26 deletions(-) diff --git a/weboob/__init__.py b/weboob/__init__.py index 19ab13f0..4090538e 100644 --- a/weboob/__init__.py +++ b/weboob/__init__.py @@ -18,4 +18,4 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -from .ouiboube import Weboob +from .ouiboube import Weboob, CallErrors diff --git a/weboob/bcall.py b/weboob/bcall.py index 27a2afd0..476cc39b 100644 --- a/weboob/bcall.py +++ b/weboob/bcall.py @@ -21,9 +21,15 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. from __future__ import with_statement from logging import debug +from copy import copy from threading import Thread, Event, RLock, Timer -__all__ = ['BackendsCall'] +__all__ = ['BackendsCall', 'CallErrors'] + +class CallErrors(Exception): + def __init__(self, errors): + Exception.__init__(self, "Several errors have been raised:\n%s" % ('\n'.join(['%s: %s' % (b, e) for b, e in errors]))) + self.errors = copy(errors) class Result(object): def __init__(self, backend, result): @@ -54,7 +60,9 @@ class BackendsCall(object): self.response_event = Event() # Waiting responses self.responses = [] - # Timers + # Errors + self.errors = [] + # Threads self.threads = [] # Create jobs for each backend @@ -68,21 +76,30 @@ class BackendsCall(object): with b: try: # Call method on backend - r = getattr(b, function)(*args, **kwargs) - debug('%s: Got answer! %s' % (b, r)) - - if hasattr(r, '__iter__'): - # Loop on iterator - for e in r: - # Lock mutex only in loop in case the iterator is slow - # (for example if backend do some parsing operations) - with self.mutex: - self.responses.append((b,e)) - self.response_event.set() - else: + try: + if callable(function): + r = function(b, *args, **kwargs) + else: + r = getattr(b, function)(*args, **kwargs) + except Exception, e: with self.mutex: - self.responses.append((b,r)) - self.response_event.set() + # TODO save backtrace and/or print it here (with debug) + self.errors.append((b, e)) + else: + debug('%s: Got answer! %s' % (b, r)) + + if hasattr(r, '__iter__'): + # Loop on iterator + for e in r: + # Lock mutex only in loop in case the iterator is slow + # (for example if backend do some parsing operations) + with self.mutex: + self.responses.append((b,e)) + self.response_event.set() + else: + with self.mutex: + self.responses.append((b,r)) + self.response_event.set() finally: with self.mutex: # This backend is now finished @@ -93,7 +110,7 @@ class BackendsCall(object): self.finish_event.set() self.response_event.set() - def _callback_thread_run(self, function): + def _callback_thread_run(self, callback, errback): responses = [] while not self.finish_event.isSet(): self.response_event.wait() @@ -106,9 +123,14 @@ class BackendsCall(object): # Consume responses while responses: - function(*responses.pop()) + callback(*responses.pop()) - def callback_thread(self, function): + if errback: + with self.mutex: + while self.errors: + errback(*self.errors.pop()) + + def callback_thread(self, callback, errback=None): """ Call this method to create a thread which will callback a specified function everytimes a new result comes. @@ -116,11 +138,12 @@ class BackendsCall(object): When the process is over, the function will be called with both arguments set to None. - The function prototype is: - def function(backend, result) + The functions prototypes: + def callback(backend, result) + def errback(backend, error) """ - return Thread(target=self._callback_thread_run, args=(function,)) + return Thread(target=self._callback_thread_run, args=(callback, errback)) def __iter__(self): # Don't know how to factorize with _callback_thread_run @@ -137,3 +160,8 @@ class BackendsCall(object): # Consume responses while responses: yield Result(*responses.pop()) + + # Raise errors + with self.mutex: + if self.errors: + raise CallErrors(self.errors) diff --git a/weboob/ouiboube.py b/weboob/ouiboube.py index 66a89454..e6530915 100644 --- a/weboob/ouiboube.py +++ b/weboob/ouiboube.py @@ -18,15 +18,17 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +from __future__ import with_statement + import os from logging import warning -from weboob.bcall import BackendsCall +from weboob.bcall import BackendsCall, CallErrors from weboob.modules import ModulesLoader, BackendsConfig from weboob.scheduler import Scheduler -__all__ = ['Weboob'] +__all__ = ['Weboob', 'CallErrors'] class Weboob(object): @@ -93,7 +95,8 @@ class Weboob(object): def iter_backends(self, caps=None): for name, backend in self.backends.iteritems(): if caps is None or backend.has_caps(caps): - yield backend + with backend: + yield backend def do(self, function, *args, **kwargs): backends = [b for b in self.iter_backends()]