diff --git a/weboob/core/bcall.py b/weboob/core/bcall.py index 344dbe1e..32fe59bd 100644 --- a/weboob/core/bcall.py +++ b/weboob/core/bcall.py @@ -18,7 +18,6 @@ # along with weboob. If not, see . -from multiprocessing.pool import ThreadPool from copy import copy from threading import Thread import Queue @@ -53,21 +52,22 @@ class BackendsCall(object): :type function: :class:`str` or :class:`callable` """ self.logger = getLogger('bcall') - # Waiting responses + self.responses = Queue.Queue() - # Threads - self.threads = ThreadPool(len(backends)) - self.backends = [(backend, function, args, kwargs) for backend in backends] + self.errors = [] + self.tasks = Queue.Queue() + + for backend in backends: + Thread(target=self.backend_process, args=(function, args, kwargs)).start() + self.tasks.put(backend) def store_result(self, backend, result): if isinstance(result, CapBaseObject): result.backend = backend.name self.responses.put((backend, result)) - def _caller(self, args): - return self.backend_thread(*args) - - def backend_thread(self, backend, function, args, kwargs): + def backend_process(self, function, args, kwargs): + backend = self.tasks.get() with backend: try: # Call method on backend @@ -79,7 +79,7 @@ class BackendsCall(object): result = getattr(backend, function)(*args, **kwargs) except Exception as error: self.logger.debug('%s: Called function %s raised an error: %r' % (backend, function, error)) - return backend, error, get_backtrace(error) + self.errors.append((backend, error, get_backtrace(error))) else: self.logger.debug('%s: Called function %s returned: %r' % (backend, function, result)) @@ -87,32 +87,24 @@ class BackendsCall(object): # Loop on iterator try: for subresult in result: - # Lock mutex only in loop in case the iterator is slow - # (for example if backend do some parsing operations) self.store_result(backend, subresult) except Exception as error: - return backend, error, get_backtrace(error) + self.errors.append((backend, error, get_backtrace(error))) else: self.store_result(backend, result) finally: - # This backend is now finished - #self.response_event.set() - #self.finish_event.set() - pass + self.tasks.task_done() def _callback_thread_run(self, callback, errback): - r = self.threads.map_async(self._caller, self.backends) - - while not r.ready() or not self.responses.empty(): + while self.tasks.unfinished_tasks or not self.responses.empty(): try: callback(*self.responses.get(timeout=0.1)) except Queue.Empty: continue # Raise errors - errors = get_errors(r) - while errors: - errback(*errors.pop(0)) + while self.errors: + errback(*self.errors.pop(0)) callback(None, None) @@ -134,24 +126,17 @@ class BackendsCall(object): return thread def wait(self): - r = self.threads.map(self._caller, self.backends) + self.tasks.join() - errors = get_errors(r) - if errors: - raise CallErrors(errors) + if self.errors: + raise CallErrors(self.errors) def __iter__(self): - r = self.threads.map_async(self._caller, self.backends) - - while not r.ready() or not self.responses.empty(): + while self.tasks.unfinished_tasks or not self.responses.empty(): try: yield self.responses.get(timeout=0.1) except Queue.Empty: continue - errors = get_errors(r) - if errors: - raise CallErrors(errors) - -def get_errors(r): - return [e for e in r.get() if e is not None] + if self.errors: + raise CallErrors(self.errors)