diff --git a/scripts/monboob b/scripts/monboob index 50d142bc..da4ef50c 100755 --- a/scripts/monboob +++ b/scripts/monboob @@ -106,9 +106,8 @@ class Monboob(ConsoleApplication): self.weboob.loop() def process(self): - for backend in self.weboob.iter_backends(): - for message in backend.iter_new_messages(): - self.send_email(backend, message) + for backend, message in self.weboob.do('iter_new_messages'): + self.send_email(backend, message) def send_email(self, backend, mail): domain = self.config.get('domain') diff --git a/weboob/__init__.py b/weboob/__init__.py index 19ab13f0..4090538e 100644 --- a/weboob/__init__.py +++ b/weboob/__init__.py @@ -18,4 +18,4 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -from .ouiboube import Weboob +from .ouiboube import Weboob, CallErrors diff --git a/weboob/backend.py b/weboob/backend.py index 46ff9a0c..84e7e4e3 100644 --- a/weboob/backend.py +++ b/weboob/backend.py @@ -19,6 +19,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ import re +from threading import RLock __all__ = ['BackendStorage', 'BaseBackend'] @@ -75,9 +76,19 @@ class BaseBackend(object): class ConfigError(Exception): pass + def __enter__(self): + self.lock.acquire() + + def __exit__(self, t, v, tb): + self.lock.release() + + def __repr__(self): + return u"" % self.name + def __init__(self, weboob, name, config, storage): self.weboob = weboob self.name = name + self.lock = RLock() self.config = {} for name, field in self.CONFIG.iteritems(): value = config.get(name, field.default) diff --git a/weboob/backends/aum/backend.py b/weboob/backends/aum/backend.py index 922b4a61..3bf1a8bf 100644 --- a/weboob/backends/aum/backend.py +++ b/weboob/backends/aum/backend.py @@ -18,6 +18,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +from __future__ import with_statement + from weboob.backend import BaseBackend from weboob.capabilities.messages import ICapMessages, ICapMessagesReply from weboob.capabilities.dating import ICapDating @@ -67,38 +69,41 @@ class AuMBackend(BaseBackend, ICapMessages, ICapMessagesReply, ICapDating): yield message def _iter_messages(self, thread, only_new): - try: - if not only_new or self.browser.nb_new_mails(): - my_name = self.browser.get_my_name() - contacts = self.browser.get_contact_list() - contacts.reverse() + with self.browser: + try: + if not only_new or self.browser.nb_new_mails(): + my_name = self.browser.get_my_name() + contacts = self.browser.get_contact_list() + contacts.reverse() - for contact in contacts: - if only_new and not contact.is_new() or thread and int(thread) != contact.get_id(): - continue + for contact in contacts: + if only_new and not contact.is_new() or thread and int(thread) != contact.get_id(): + continue - mails = self.browser.get_thread_mails(contact.get_id()) - profile = None - for i in xrange(len(mails)): - mail = mails[i] - if only_new and mail.get_from() == my_name: - break + mails = self.browser.get_thread_mails(contact.get_id()) + profile = None + for i in xrange(len(mails)): + mail = mails[i] + if only_new and mail.get_from() == my_name: + break - if not profile: - profile = self.browser.get_profile(contact.get_id()) - mail.signature += u'\n%s' % profile.get_profile_text() - yield mail - except BrowserUnavailable: - pass + if not profile: + profile = self.browser.get_profile(contact.get_id()) + mail.signature += u'\n%s' % profile.get_profile_text() + yield mail + except BrowserUnavailable: + pass def post_reply(self, thread_id, reply_id, title, message): for message in self._iter_messages(thread_id, True): self.queue_messages.append(message) - return self.browser.post(thread_id, message) + with self.browser: + return self.browser.post(thread_id, message) def get_profile(self, _id): try: - return self.browser.get_profile(_id) + with self.browser: + return self.browser.get_profile(_id) except BrowserUnavailable: return None diff --git a/weboob/backends/aum/optim/profiles_walker.py b/weboob/backends/aum/optim/profiles_walker.py index 464421a6..c01b0a0b 100644 --- a/weboob/backends/aum/optim/profiles_walker.py +++ b/weboob/backends/aum/optim/profiles_walker.py @@ -18,6 +18,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +from __future__ import with_statement + from logging import debug from random import randint from weboob.tools.browser import BrowserUnavailable @@ -34,7 +36,7 @@ class ProfilesWalker(object): self.view_cron = sched.schedule(randint(10,40), self.view_profile) def save(self): - self.storage.set('profiles_walker', 'viewed', self.visited_profiles) + self.storage.set('profiles_walker', 'viewed', list(self.visited_profiles)) self.storage.save() def stop(self): @@ -42,7 +44,8 @@ class ProfilesWalker(object): self.event = None def walk(self): - self.profiles_queue = self.profiles_queue.union(self.browser.search_profiles()).difference(self.visited_profiles) + with self.browser: + self.profiles_queue = self.profiles_queue.union(self.browser.search_profiles()).difference(self.visited_profiles) self.save() def view_profile(self): @@ -53,7 +56,8 @@ class ProfilesWalker(object): return # empty queue try: - profile = self.browser.get_profile(id) + with self.browser: + profile = self.browser.get_profile(id) debug(u'Visited %s (%s)' % (profile.get_name(), id)) # Get score from the aum_score module diff --git a/weboob/bcall.py b/weboob/bcall.py new file mode 100644 index 00000000..476cc39b --- /dev/null +++ b/weboob/bcall.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- + +""" +Copyright(C) 2010 Romain Bignon + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +""" + +from __future__ import with_statement + +from logging import debug +from copy import copy +from threading import Thread, Event, RLock, Timer + +__all__ = ['BackendsCall', 'CallErrors'] + +class CallErrors(Exception): + def __init__(self, errors): + Exception.__init__(self, "Several errors have been raised:\n%s" % ('\n'.join(['%s: %s' % (b, e) for b, e in errors]))) + self.errors = copy(errors) + +class Result(object): + def __init__(self, backend, result): + self.backend = backend + self.result = result + + def __iter__(self): + """ + To allow unpack. + + For example: + >>> for backend, result in self.weboob.do(blabla) + """ + yield self.backend + yield self.result + +class BackendsCall(object): + def __init__(self, backends, function, *args, **kwargs): + # Store if a backend is finished + self.backends = {} + for backend in backends: + self.backends[backend.name] = False + # Global mutex on object + self.mutex = RLock() + # Event set when every backends have give their data + self.finish_event = Event() + # Event set when there are new responses + self.response_event = Event() + # Waiting responses + self.responses = [] + # Errors + self.errors = [] + # Threads + self.threads = [] + + # Create jobs for each backend + with self.mutex: + for b in backends: + debug('New timer for %s' % b) + self.threads.append(Timer(0, self.caller, (b, function, args, kwargs)).start()) + + def caller(self, b, function, args, kwargs): + debug('Hello from timer %s' % b) + with b: + try: + # Call method on backend + try: + if callable(function): + r = function(b, *args, **kwargs) + else: + r = getattr(b, function)(*args, **kwargs) + except Exception, e: + with self.mutex: + # TODO save backtrace and/or print it here (with debug) + self.errors.append((b, e)) + else: + debug('%s: Got answer! %s' % (b, r)) + + if hasattr(r, '__iter__'): + # Loop on iterator + for e in r: + # Lock mutex only in loop in case the iterator is slow + # (for example if backend do some parsing operations) + with self.mutex: + self.responses.append((b,e)) + self.response_event.set() + else: + with self.mutex: + self.responses.append((b,r)) + self.response_event.set() + finally: + with self.mutex: + # This backend is now finished + self.backends[b.name] = True + for finished in self.backends.itervalues(): + if not finished: + return + self.finish_event.set() + self.response_event.set() + + def _callback_thread_run(self, callback, errback): + responses = [] + while not self.finish_event.isSet(): + self.response_event.wait() + with self.mutex: + responses = self.responses + self.responses = [] + + # Reset event + self.response_event.clear() + + # Consume responses + while responses: + callback(*responses.pop()) + + if errback: + with self.mutex: + while self.errors: + errback(*self.errors.pop()) + + def callback_thread(self, callback, errback=None): + """ + Call this method to create a thread which will callback a + specified function everytimes a new result comes. + + When the process is over, the function will be called with + both arguments set to None. + + The functions prototypes: + def callback(backend, result) + def errback(backend, error) + + """ + return Thread(target=self._callback_thread_run, args=(callback, errback)) + + def __iter__(self): + # Don't know how to factorize with _callback_thread_run + responses = [] + while not self.finish_event.isSet(): + self.response_event.wait() + with self.mutex: + responses = self.responses + self.responses = [] + + # Reset event + self.response_event.clear() + + # Consume responses + while responses: + yield Result(*responses.pop()) + + # Raise errors + with self.mutex: + if self.errors: + raise CallErrors(self.errors) diff --git a/weboob/frontends/havesex/application.py b/weboob/frontends/havesex/application.py index 3afc34d7..76681781 100644 --- a/weboob/frontends/havesex/application.py +++ b/weboob/frontends/havesex/application.py @@ -18,6 +18,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +import sys from weboob.tools.application import PromptApplication from weboob.capabilities.dating import ICapDating @@ -59,7 +60,19 @@ class HaveSex(PromptApplication): print profile.get_profile_text() return True - @PromptApplication.command("start profiles walker") - def command_walker(self): + def service(self, action, function): + sys.stdout.write('%s:' % action) for backend in self.weboob.iter_backends(): - backend.start_profiles_walker() + sys.stdout.write(' ' + backend.name) + sys.stdout.flush() + getattr(backend, function)() + sys.stdout.write('.\n') + + @PromptApplication.command("start profiles walker") + def command_walker(self, action): + if action == 'start': + self.service('Starting walker', 'start_profiles_walker') + elif action == 'stop': + self.service('Stopping walker', 'stop_profiles_walker') + else: + print >>sys.stderr, 'Syntax: walker (start|stop)' diff --git a/weboob/frontends/videoob/application.py b/weboob/frontends/videoob/application.py index 798c9ae9..b0f73f4f 100644 --- a/weboob/frontends/videoob/application.py +++ b/weboob/frontends/videoob/application.py @@ -70,7 +70,11 @@ class Videoob(ConsoleApplication): else: results['BEFORE'] = u'Last videos' results['HEADER'] = ('ID', 'Title', 'Duration') - for backend in self.weboob.iter_backends(): - results[backend.name] = [(video.id, video.title, video.formatted_duration) for video in - backend.iter_search_results(pattern=pattern, nsfw=self.options.nsfw)] + + for backend, video in self.weboob.do('iter_search_results', pattern=pattern, nsfw=self.options.nsfw): + row = (video.id, video.title, video.formatted_duration) + try: + results[backend.name].append(row) + except KeyError: + results[backend.name] = [row] return results diff --git a/weboob/ouiboube.py b/weboob/ouiboube.py index 473f1b30..e6530915 100644 --- a/weboob/ouiboube.py +++ b/weboob/ouiboube.py @@ -18,14 +18,17 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ +from __future__ import with_statement + import os from logging import warning +from weboob.bcall import BackendsCall, CallErrors from weboob.modules import ModulesLoader, BackendsConfig from weboob.scheduler import Scheduler -__all__ = ['Weboob'] +__all__ = ['Weboob', 'CallErrors'] class Weboob(object): @@ -92,7 +95,16 @@ class Weboob(object): def iter_backends(self, caps=None): for name, backend in self.backends.iteritems(): if caps is None or backend.has_caps(caps): - yield backend + with backend: + yield backend + + def do(self, function, *args, **kwargs): + backends = [b for b in self.iter_backends()] + return BackendsCall(backends, function, *args, **kwargs) + + def do_caps(self, caps, function, *args, **kwargs): + backends = [b for b in self.iter_backends(caps)] + return BackendsCall(backends, function, *args, **kwargs) def schedule(self, interval, function, *args): return self.scheduler.schedule(interval, function, *args) diff --git a/weboob/scheduler.py b/weboob/scheduler.py index 6ae54af6..c8791fb6 100644 --- a/weboob/scheduler.py +++ b/weboob/scheduler.py @@ -18,34 +18,45 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -import sched -import time - +from threading import Timer, Event __all__ = ['Scheduler'] - -class Scheduler(object): - def __init__(self): - self.scheduler = sched.scheduler(time.time, time.sleep) - self.running = False - +class IScheduler(object): def schedule(self, interval, function, *args): - return self.scheduler.enter(interval, 1, function, args) + raise NotImplementedError() def repeat(self, interval, function, *args): - return self.scheduler.enter(interval, 1, self._repeated_cb, (interval, function, args)) + raise NotImplementedError() def run(self): - self.running = True - while self.running: - self.scheduler.run() - if not self.scheduler.queue: - self.scheduler.delayfunc(0.001) + raise NotImplementedError() + + def want_stop(self): + raise NotImplementedError() + +class Scheduler(IScheduler): + def __init__(self): + self.stop_event = Event() + self.count = 0 + self.queue = {} + + def schedule(self, interval, function, *args): + self.count += 1 + timer = Timer(interval, function, args) + timer.start() + self.queue[self.count] = timer + return self.count + + def repeat(self, interval, function, *args): + return self.schedule(interval, self._repeated_cb, interval, function, args) + + def run(self): + self.stop_event.wait() return True def want_stop(self): - self.running = False + self.stop_event.set() def _repeated_cb(self, interval, func, args): func(*args) diff --git a/weboob/tools/application/base.py b/weboob/tools/application/base.py index 0b8f67f1..fdbe5f54 100644 --- a/weboob/tools/application/base.py +++ b/weboob/tools/application/base.py @@ -164,3 +164,4 @@ class BaseApplication(object): print 'Program killed by SIGINT' except ConfigError, e: print 'Configuration error: %s' % e + sys.exit(1) diff --git a/weboob/tools/application/prompt.py b/weboob/tools/application/prompt.py index db8096a4..85bbace3 100644 --- a/weboob/tools/application/prompt.py +++ b/weboob/tools/application/prompt.py @@ -18,9 +18,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """ -import sched -import time -import select import sys from weboob import Weboob @@ -34,18 +31,18 @@ __all__ = ['PromptApplication'] class PromptScheduler(Scheduler): def __init__(self, prompt_cb, read_cb): - self.scheduler = sched.scheduler(time.time, self.sleep) + Scheduler.__init__(self) self.read_cb = read_cb self.prompt_cb = prompt_cb - def sleep(self, d): - self.prompt_cb() + def run(self): try: - read, write, excepts = select.select([sys.stdin], [], [], d or None) - if read: + while not self.stop_event.isSet(): + self.prompt_cb() line = sys.stdin.readline() if not line: self.want_stop() + sys.stdout.write('\n') else: self.read_cb(line.strip()) except KeyboardInterrupt: @@ -55,6 +52,13 @@ class PromptApplication(ConsoleApplication): def create_weboob(self): return Weboob(self.APPNAME, scheduler=PromptScheduler(self.prompt, self.read_cb)) + @ConsoleApplication.command("Display this notice") + def command_help(self): + print 'Available commands:' + for name, arguments, doc_string in self._commands: + command = '%s %s' % (name, arguments) + print ' %-30s %s' % (command, doc_string) + def prompt(self): sys.stdout.write('> ') sys.stdout.flush() @@ -64,4 +68,5 @@ class PromptApplication(ConsoleApplication): def read_cb(self, line): line = line.split() - self.process_command(*line) + if line: + self.process_command(*line) diff --git a/weboob/tools/application/qt.py b/weboob/tools/application/qt.py index 2d4cc93a..f5230eda 100644 --- a/weboob/tools/application/qt.py +++ b/weboob/tools/application/qt.py @@ -23,13 +23,13 @@ from PyQt4.QtCore import QTimer, SIGNAL from PyQt4.QtGui import QMainWindow, QApplication from weboob import Weboob -from weboob.scheduler import Scheduler +from weboob.scheduler import IScheduler from .base import BaseApplication __all__ = ['QtApplication'] -class QtScheduler(Scheduler): +class QtScheduler(IScheduler): def __init__(self, app): self.app = app self.timers = {} diff --git a/weboob/tools/browser.py b/weboob/tools/browser.py index 1e59b3ef..a09abead 100644 --- a/weboob/tools/browser.py +++ b/weboob/tools/browser.py @@ -25,6 +25,7 @@ import re import time from logging import warning, error, debug from copy import copy +from threading import RLock from weboob.tools.parsers import get_parser @@ -160,12 +161,19 @@ class BaseBrowser(mechanize.Browser): self.last_update = 0.0 self.username = username self.password = password + self.lock = RLock() if self.password: try: self.home() except BrowserUnavailable: pass + def __enter__(self): + self.lock.acquire() + + def __exit__(self, t, v, tb): + self.lock.release() + def pageaccess(func): def inner(self, *args, **kwargs): if not self.page or self.password and not self.page.is_logged():