From 07f5500cce758a460cbc3f0b6f34e2447ed007ae Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Thu, 1 May 2014 17:04:27 +0200 Subject: [PATCH] rewrite BackendCalls with ThreadPool and queues --- weboob/core/bcall.py | 131 ++++++++++++++++--------------------------- 1 file changed, 47 insertions(+), 84 deletions(-) diff --git a/weboob/core/bcall.py b/weboob/core/bcall.py index 40edfce4..344dbe1e 100644 --- a/weboob/core/bcall.py +++ b/weboob/core/bcall.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright(C) 2010-2011 Romain Bignon, Christophe Benz +# Copyright(C) 2010-2014 Romain Bignon, Christophe Benz # # This file is part of weboob. # @@ -18,10 +18,10 @@ # along with weboob. If not, see . - - +from multiprocessing.pool import ThreadPool from copy import copy -from threading import Thread, Event, RLock, Timer +from threading import Thread +import Queue from weboob.capabilities.base import CapBaseObject from weboob.tools.misc import get_backtrace @@ -53,43 +53,21 @@ class BackendsCall(object): :type function: :class:`str` or :class:`callable` """ self.logger = getLogger('bcall') - # Store if a backend is finished - self.backends = {} - for backend in backends: - self.backends[backend.name] = False - # Global mutex on object - self.mutex = RLock() - # Event set when every backends have give their data - self.finish_event = Event() - # Event set when there are new responses - self.response_event = Event() # Waiting responses - self.responses = [] - # Errors - self.errors = [] + self.responses = Queue.Queue() # Threads - self.threads = [] + self.threads = ThreadPool(len(backends)) + self.backends = [(backend, function, args, kwargs) for backend in backends] - # Create jobs for each backend - with self.mutex: - for backend in backends: - self.threads.append(Timer(0, self._caller, (backend, function, args, kwargs)).start()) - if not backends: - self.finish_event.set() + def store_result(self, backend, result): + if isinstance(result, CapBaseObject): + result.backend = backend.name + self.responses.put((backend, result)) - def _store_error(self, backend, error): - with self.mutex: - backtrace = get_backtrace(error) - self.errors.append((backend, error, backtrace)) + def _caller(self, args): + return self.backend_thread(*args) - def _store_result(self, backend, result): - with self.mutex: - if isinstance(result, CapBaseObject): - result.backend = backend.name - self.responses.append((backend, result)) - self.response_event.set() - - def _caller(self, backend, function, args, kwargs): + def backend_thread(self, backend, function, args, kwargs): with backend: try: # Call method on backend @@ -101,7 +79,7 @@ class BackendsCall(object): result = getattr(backend, function)(*args, **kwargs) except Exception as error: self.logger.debug('%s: Called function %s raised an error: %r' % (backend, function, error)) - self._store_error(backend, error) + return backend, error, get_backtrace(error) else: self.logger.debug('%s: Called function %s returned: %r' % (backend, function, result)) @@ -111,40 +89,30 @@ class BackendsCall(object): for subresult in result: # Lock mutex only in loop in case the iterator is slow # (for example if backend do some parsing operations) - self._store_result(backend, subresult) + self.store_result(backend, subresult) except Exception as error: - self._store_error(backend, error) + return backend, error, get_backtrace(error) else: - self._store_result(backend, result) + self.store_result(backend, result) finally: - with self.mutex: - # This backend is now finished - self.backends[backend.name] = True - for finished in self.backends.itervalues(): - if not finished: - return - self.response_event.set() - self.finish_event.set() + # This backend is now finished + #self.response_event.set() + #self.finish_event.set() + pass def _callback_thread_run(self, callback, errback): - responses = [] - while not self.finish_event.isSet() or self.response_event.isSet(): - self.response_event.wait() - with self.mutex: - responses = self.responses - self.responses = [] + r = self.threads.map_async(self._caller, self.backends) - # Reset event - self.response_event.clear() + while not r.ready() or not self.responses.empty(): + try: + callback(*self.responses.get(timeout=0.1)) + except Queue.Empty: + continue - # Consume responses - while responses: - callback(*responses.pop(0)) - - if errback: - with self.mutex: - while self.errors: - errback(*self.errors.pop(0)) + # Raise errors + errors = get_errors(r) + while errors: + errback(*errors.pop(0)) callback(None, None) @@ -166,29 +134,24 @@ class BackendsCall(object): return thread def wait(self): - self.finish_event.wait() + r = self.threads.map(self._caller, self.backends) - with self.mutex: - if self.errors: - raise CallErrors(self.errors) + errors = get_errors(r) + if errors: + raise CallErrors(errors) def __iter__(self): - # Don't know how to factorize with _callback_thread_run - responses = [] - while not self.finish_event.isSet() or self.response_event.isSet(): - self.response_event.wait() - with self.mutex: - responses = self.responses - self.responses = [] + r = self.threads.map_async(self._caller, self.backends) - # Reset event - self.response_event.clear() + while not r.ready() or not self.responses.empty(): + try: + yield self.responses.get(timeout=0.1) + except Queue.Empty: + continue - # Consume responses - while responses: - yield responses.pop(0) + errors = get_errors(r) + if errors: + raise CallErrors(errors) - # Raise errors - with self.mutex: - if self.errors: - raise CallErrors(self.errors) +def get_errors(r): + return [e for e in r.get() if e is not None]