Merge branch 'threads'

Conflicts:
	weboob/frontends/videoob/application.py
This commit is contained in:
Romain Bignon 2010-04-29 11:10:19 +02:00
commit 926a25b992
14 changed files with 305 additions and 65 deletions

View file

@ -106,9 +106,8 @@ class Monboob(ConsoleApplication):
self.weboob.loop()
def process(self):
for backend in self.weboob.iter_backends():
for message in backend.iter_new_messages():
self.send_email(backend, message)
for backend, message in self.weboob.do('iter_new_messages'):
self.send_email(backend, message)
def send_email(self, backend, mail):
domain = self.config.get('domain')

View file

@ -18,4 +18,4 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
from .ouiboube import Weboob
from .ouiboube import Weboob, CallErrors

View file

@ -19,6 +19,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
import re
from threading import RLock
__all__ = ['BackendStorage', 'BaseBackend']
@ -75,9 +76,19 @@ class BaseBackend(object):
class ConfigError(Exception): pass
def __enter__(self):
self.lock.acquire()
def __exit__(self, t, v, tb):
self.lock.release()
def __repr__(self):
return u"<Backend '%s'>" % self.name
def __init__(self, weboob, name, config, storage):
self.weboob = weboob
self.name = name
self.lock = RLock()
self.config = {}
for name, field in self.CONFIG.iteritems():
value = config.get(name, field.default)

View file

@ -18,6 +18,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
from __future__ import with_statement
from weboob.backend import BaseBackend
from weboob.capabilities.messages import ICapMessages, ICapMessagesReply
from weboob.capabilities.dating import ICapDating
@ -67,38 +69,41 @@ class AuMBackend(BaseBackend, ICapMessages, ICapMessagesReply, ICapDating):
yield message
def _iter_messages(self, thread, only_new):
try:
if not only_new or self.browser.nb_new_mails():
my_name = self.browser.get_my_name()
contacts = self.browser.get_contact_list()
contacts.reverse()
with self.browser:
try:
if not only_new or self.browser.nb_new_mails():
my_name = self.browser.get_my_name()
contacts = self.browser.get_contact_list()
contacts.reverse()
for contact in contacts:
if only_new and not contact.is_new() or thread and int(thread) != contact.get_id():
continue
for contact in contacts:
if only_new and not contact.is_new() or thread and int(thread) != contact.get_id():
continue
mails = self.browser.get_thread_mails(contact.get_id())
profile = None
for i in xrange(len(mails)):
mail = mails[i]
if only_new and mail.get_from() == my_name:
break
mails = self.browser.get_thread_mails(contact.get_id())
profile = None
for i in xrange(len(mails)):
mail = mails[i]
if only_new and mail.get_from() == my_name:
break
if not profile:
profile = self.browser.get_profile(contact.get_id())
mail.signature += u'\n%s' % profile.get_profile_text()
yield mail
except BrowserUnavailable:
pass
if not profile:
profile = self.browser.get_profile(contact.get_id())
mail.signature += u'\n%s' % profile.get_profile_text()
yield mail
except BrowserUnavailable:
pass
def post_reply(self, thread_id, reply_id, title, message):
for message in self._iter_messages(thread_id, True):
self.queue_messages.append(message)
return self.browser.post(thread_id, message)
with self.browser:
return self.browser.post(thread_id, message)
def get_profile(self, _id):
try:
return self.browser.get_profile(_id)
with self.browser:
return self.browser.get_profile(_id)
except BrowserUnavailable:
return None

View file

@ -18,6 +18,8 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
from __future__ import with_statement
from logging import debug
from random import randint
from weboob.tools.browser import BrowserUnavailable
@ -34,7 +36,7 @@ class ProfilesWalker(object):
self.view_cron = sched.schedule(randint(10,40), self.view_profile)
def save(self):
self.storage.set('profiles_walker', 'viewed', self.visited_profiles)
self.storage.set('profiles_walker', 'viewed', list(self.visited_profiles))
self.storage.save()
def stop(self):
@ -42,7 +44,8 @@ class ProfilesWalker(object):
self.event = None
def walk(self):
self.profiles_queue = self.profiles_queue.union(self.browser.search_profiles()).difference(self.visited_profiles)
with self.browser:
self.profiles_queue = self.profiles_queue.union(self.browser.search_profiles()).difference(self.visited_profiles)
self.save()
def view_profile(self):
@ -53,7 +56,8 @@ class ProfilesWalker(object):
return # empty queue
try:
profile = self.browser.get_profile(id)
with self.browser:
profile = self.browser.get_profile(id)
debug(u'Visited %s (%s)' % (profile.get_name(), id))
# Get score from the aum_score module

167
weboob/bcall.py Normal file
View file

@ -0,0 +1,167 @@
# -*- coding: utf-8 -*-
"""
Copyright(C) 2010 Romain Bignon
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, version 3 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
from __future__ import with_statement
from logging import debug
from copy import copy
from threading import Thread, Event, RLock, Timer
__all__ = ['BackendsCall', 'CallErrors']
class CallErrors(Exception):
def __init__(self, errors):
Exception.__init__(self, "Several errors have been raised:\n%s" % ('\n'.join(['%s: %s' % (b, e) for b, e in errors])))
self.errors = copy(errors)
class Result(object):
def __init__(self, backend, result):
self.backend = backend
self.result = result
def __iter__(self):
"""
To allow unpack.
For example:
>>> for backend, result in self.weboob.do(blabla)
"""
yield self.backend
yield self.result
class BackendsCall(object):
def __init__(self, backends, function, *args, **kwargs):
# Store if a backend is finished
self.backends = {}
for backend in backends:
self.backends[backend.name] = False
# Global mutex on object
self.mutex = RLock()
# Event set when every backends have give their data
self.finish_event = Event()
# Event set when there are new responses
self.response_event = Event()
# Waiting responses
self.responses = []
# Errors
self.errors = []
# Threads
self.threads = []
# Create jobs for each backend
with self.mutex:
for b in backends:
debug('New timer for %s' % b)
self.threads.append(Timer(0, self.caller, (b, function, args, kwargs)).start())
def caller(self, b, function, args, kwargs):
debug('Hello from timer %s' % b)
with b:
try:
# Call method on backend
try:
if callable(function):
r = function(b, *args, **kwargs)
else:
r = getattr(b, function)(*args, **kwargs)
except Exception, e:
with self.mutex:
# TODO save backtrace and/or print it here (with debug)
self.errors.append((b, e))
else:
debug('%s: Got answer! %s' % (b, r))
if hasattr(r, '__iter__'):
# Loop on iterator
for e in r:
# Lock mutex only in loop in case the iterator is slow
# (for example if backend do some parsing operations)
with self.mutex:
self.responses.append((b,e))
self.response_event.set()
else:
with self.mutex:
self.responses.append((b,r))
self.response_event.set()
finally:
with self.mutex:
# This backend is now finished
self.backends[b.name] = True
for finished in self.backends.itervalues():
if not finished:
return
self.finish_event.set()
self.response_event.set()
def _callback_thread_run(self, callback, errback):
responses = []
while not self.finish_event.isSet():
self.response_event.wait()
with self.mutex:
responses = self.responses
self.responses = []
# Reset event
self.response_event.clear()
# Consume responses
while responses:
callback(*responses.pop())
if errback:
with self.mutex:
while self.errors:
errback(*self.errors.pop())
def callback_thread(self, callback, errback=None):
"""
Call this method to create a thread which will callback a
specified function everytimes a new result comes.
When the process is over, the function will be called with
both arguments set to None.
The functions prototypes:
def callback(backend, result)
def errback(backend, error)
"""
return Thread(target=self._callback_thread_run, args=(callback, errback))
def __iter__(self):
# Don't know how to factorize with _callback_thread_run
responses = []
while not self.finish_event.isSet():
self.response_event.wait()
with self.mutex:
responses = self.responses
self.responses = []
# Reset event
self.response_event.clear()
# Consume responses
while responses:
yield Result(*responses.pop())
# Raise errors
with self.mutex:
if self.errors:
raise CallErrors(self.errors)

View file

@ -18,6 +18,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
import sys
from weboob.tools.application import PromptApplication
from weboob.capabilities.dating import ICapDating
@ -59,7 +60,19 @@ class HaveSex(PromptApplication):
print profile.get_profile_text()
return True
@PromptApplication.command("start profiles walker")
def command_walker(self):
def service(self, action, function):
sys.stdout.write('%s:' % action)
for backend in self.weboob.iter_backends():
backend.start_profiles_walker()
sys.stdout.write(' ' + backend.name)
sys.stdout.flush()
getattr(backend, function)()
sys.stdout.write('.\n')
@PromptApplication.command("start profiles walker")
def command_walker(self, action):
if action == 'start':
self.service('Starting walker', 'start_profiles_walker')
elif action == 'stop':
self.service('Stopping walker', 'stop_profiles_walker')
else:
print >>sys.stderr, 'Syntax: walker (start|stop)'

View file

@ -70,7 +70,11 @@ class Videoob(ConsoleApplication):
else:
results['BEFORE'] = u'Last videos'
results['HEADER'] = ('ID', 'Title', 'Duration')
for backend in self.weboob.iter_backends():
results[backend.name] = [(video.id, video.title, video.formatted_duration) for video in
backend.iter_search_results(pattern=pattern, nsfw=self.options.nsfw)]
for backend, video in self.weboob.do('iter_search_results', pattern=pattern, nsfw=self.options.nsfw):
row = (video.id, video.title, video.formatted_duration)
try:
results[backend.name].append(row)
except KeyError:
results[backend.name] = [row]
return results

View file

@ -18,14 +18,17 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
from __future__ import with_statement
import os
from logging import warning
from weboob.bcall import BackendsCall, CallErrors
from weboob.modules import ModulesLoader, BackendsConfig
from weboob.scheduler import Scheduler
__all__ = ['Weboob']
__all__ = ['Weboob', 'CallErrors']
class Weboob(object):
@ -92,7 +95,16 @@ class Weboob(object):
def iter_backends(self, caps=None):
for name, backend in self.backends.iteritems():
if caps is None or backend.has_caps(caps):
yield backend
with backend:
yield backend
def do(self, function, *args, **kwargs):
backends = [b for b in self.iter_backends()]
return BackendsCall(backends, function, *args, **kwargs)
def do_caps(self, caps, function, *args, **kwargs):
backends = [b for b in self.iter_backends(caps)]
return BackendsCall(backends, function, *args, **kwargs)
def schedule(self, interval, function, *args):
return self.scheduler.schedule(interval, function, *args)

View file

@ -18,34 +18,45 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
import sched
import time
from threading import Timer, Event
__all__ = ['Scheduler']
class Scheduler(object):
def __init__(self):
self.scheduler = sched.scheduler(time.time, time.sleep)
self.running = False
class IScheduler(object):
def schedule(self, interval, function, *args):
return self.scheduler.enter(interval, 1, function, args)
raise NotImplementedError()
def repeat(self, interval, function, *args):
return self.scheduler.enter(interval, 1, self._repeated_cb, (interval, function, args))
raise NotImplementedError()
def run(self):
self.running = True
while self.running:
self.scheduler.run()
if not self.scheduler.queue:
self.scheduler.delayfunc(0.001)
raise NotImplementedError()
def want_stop(self):
raise NotImplementedError()
class Scheduler(IScheduler):
def __init__(self):
self.stop_event = Event()
self.count = 0
self.queue = {}
def schedule(self, interval, function, *args):
self.count += 1
timer = Timer(interval, function, args)
timer.start()
self.queue[self.count] = timer
return self.count
def repeat(self, interval, function, *args):
return self.schedule(interval, self._repeated_cb, interval, function, args)
def run(self):
self.stop_event.wait()
return True
def want_stop(self):
self.running = False
self.stop_event.set()
def _repeated_cb(self, interval, func, args):
func(*args)

View file

@ -164,3 +164,4 @@ class BaseApplication(object):
print 'Program killed by SIGINT'
except ConfigError, e:
print 'Configuration error: %s' % e
sys.exit(1)

View file

@ -18,9 +18,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
"""
import sched
import time
import select
import sys
from weboob import Weboob
@ -34,18 +31,18 @@ __all__ = ['PromptApplication']
class PromptScheduler(Scheduler):
def __init__(self, prompt_cb, read_cb):
self.scheduler = sched.scheduler(time.time, self.sleep)
Scheduler.__init__(self)
self.read_cb = read_cb
self.prompt_cb = prompt_cb
def sleep(self, d):
self.prompt_cb()
def run(self):
try:
read, write, excepts = select.select([sys.stdin], [], [], d or None)
if read:
while not self.stop_event.isSet():
self.prompt_cb()
line = sys.stdin.readline()
if not line:
self.want_stop()
sys.stdout.write('\n')
else:
self.read_cb(line.strip())
except KeyboardInterrupt:
@ -55,6 +52,13 @@ class PromptApplication(ConsoleApplication):
def create_weboob(self):
return Weboob(self.APPNAME, scheduler=PromptScheduler(self.prompt, self.read_cb))
@ConsoleApplication.command("Display this notice")
def command_help(self):
print 'Available commands:'
for name, arguments, doc_string in self._commands:
command = '%s %s' % (name, arguments)
print ' %-30s %s' % (command, doc_string)
def prompt(self):
sys.stdout.write('> ')
sys.stdout.flush()
@ -64,4 +68,5 @@ class PromptApplication(ConsoleApplication):
def read_cb(self, line):
line = line.split()
self.process_command(*line)
if line:
self.process_command(*line)

View file

@ -23,13 +23,13 @@ from PyQt4.QtCore import QTimer, SIGNAL
from PyQt4.QtGui import QMainWindow, QApplication
from weboob import Weboob
from weboob.scheduler import Scheduler
from weboob.scheduler import IScheduler
from .base import BaseApplication
__all__ = ['QtApplication']
class QtScheduler(Scheduler):
class QtScheduler(IScheduler):
def __init__(self, app):
self.app = app
self.timers = {}

View file

@ -25,6 +25,7 @@ import re
import time
from logging import warning, error, debug
from copy import copy
from threading import RLock
from weboob.tools.parsers import get_parser
@ -160,12 +161,19 @@ class BaseBrowser(mechanize.Browser):
self.last_update = 0.0
self.username = username
self.password = password
self.lock = RLock()
if self.password:
try:
self.home()
except BrowserUnavailable:
pass
def __enter__(self):
self.lock.acquire()
def __exit__(self, t, v, tb):
self.lock.release()
def pageaccess(func):
def inner(self, *args, **kwargs):
if not self.page or self.password and not self.page.is_logged():