rewrite BackendCalls with ThreadPool and queues

This commit is contained in:
Romain Bignon 2014-05-01 17:04:27 +02:00
commit 07f5500cce

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon, Christophe Benz # Copyright(C) 2010-2014 Romain Bignon, Christophe Benz
# #
# This file is part of weboob. # This file is part of weboob.
# #
@ -18,10 +18,10 @@
# along with weboob. If not, see <http://www.gnu.org/licenses/>. # along with weboob. If not, see <http://www.gnu.org/licenses/>.
from multiprocessing.pool import ThreadPool
from copy import copy from copy import copy
from threading import Thread, Event, RLock, Timer from threading import Thread
import Queue
from weboob.capabilities.base import CapBaseObject from weboob.capabilities.base import CapBaseObject
from weboob.tools.misc import get_backtrace from weboob.tools.misc import get_backtrace
@ -53,43 +53,21 @@ class BackendsCall(object):
:type function: :class:`str` or :class:`callable` :type function: :class:`str` or :class:`callable`
""" """
self.logger = getLogger('bcall') self.logger = getLogger('bcall')
# Store if a backend is finished
self.backends = {}
for backend in backends:
self.backends[backend.name] = False
# Global mutex on object
self.mutex = RLock()
# Event set when every backends have give their data
self.finish_event = Event()
# Event set when there are new responses
self.response_event = Event()
# Waiting responses # Waiting responses
self.responses = [] self.responses = Queue.Queue()
# Errors
self.errors = []
# Threads # Threads
self.threads = [] self.threads = ThreadPool(len(backends))
self.backends = [(backend, function, args, kwargs) for backend in backends]
# Create jobs for each backend def store_result(self, backend, result):
with self.mutex: if isinstance(result, CapBaseObject):
for backend in backends: result.backend = backend.name
self.threads.append(Timer(0, self._caller, (backend, function, args, kwargs)).start()) self.responses.put((backend, result))
if not backends:
self.finish_event.set()
def _store_error(self, backend, error): def _caller(self, args):
with self.mutex: return self.backend_thread(*args)
backtrace = get_backtrace(error)
self.errors.append((backend, error, backtrace))
def _store_result(self, backend, result): def backend_thread(self, backend, function, args, kwargs):
with self.mutex:
if isinstance(result, CapBaseObject):
result.backend = backend.name
self.responses.append((backend, result))
self.response_event.set()
def _caller(self, backend, function, args, kwargs):
with backend: with backend:
try: try:
# Call method on backend # Call method on backend
@ -101,7 +79,7 @@ class BackendsCall(object):
result = getattr(backend, function)(*args, **kwargs) result = getattr(backend, function)(*args, **kwargs)
except Exception as error: except Exception as error:
self.logger.debug('%s: Called function %s raised an error: %r' % (backend, function, error)) self.logger.debug('%s: Called function %s raised an error: %r' % (backend, function, error))
self._store_error(backend, error) return backend, error, get_backtrace(error)
else: else:
self.logger.debug('%s: Called function %s returned: %r' % (backend, function, result)) self.logger.debug('%s: Called function %s returned: %r' % (backend, function, result))
@ -111,40 +89,30 @@ class BackendsCall(object):
for subresult in result: for subresult in result:
# Lock mutex only in loop in case the iterator is slow # Lock mutex only in loop in case the iterator is slow
# (for example if backend do some parsing operations) # (for example if backend do some parsing operations)
self._store_result(backend, subresult) self.store_result(backend, subresult)
except Exception as error: except Exception as error:
self._store_error(backend, error) return backend, error, get_backtrace(error)
else: else:
self._store_result(backend, result) self.store_result(backend, result)
finally: finally:
with self.mutex: # This backend is now finished
# This backend is now finished #self.response_event.set()
self.backends[backend.name] = True #self.finish_event.set()
for finished in self.backends.itervalues(): pass
if not finished:
return
self.response_event.set()
self.finish_event.set()
def _callback_thread_run(self, callback, errback): def _callback_thread_run(self, callback, errback):
responses = [] r = self.threads.map_async(self._caller, self.backends)
while not self.finish_event.isSet() or self.response_event.isSet():
self.response_event.wait()
with self.mutex:
responses = self.responses
self.responses = []
# Reset event while not r.ready() or not self.responses.empty():
self.response_event.clear() try:
callback(*self.responses.get(timeout=0.1))
except Queue.Empty:
continue
# Consume responses # Raise errors
while responses: errors = get_errors(r)
callback(*responses.pop(0)) while errors:
errback(*errors.pop(0))
if errback:
with self.mutex:
while self.errors:
errback(*self.errors.pop(0))
callback(None, None) callback(None, None)
@ -166,29 +134,24 @@ class BackendsCall(object):
return thread return thread
def wait(self): def wait(self):
self.finish_event.wait() r = self.threads.map(self._caller, self.backends)
with self.mutex: errors = get_errors(r)
if self.errors: if errors:
raise CallErrors(self.errors) raise CallErrors(errors)
def __iter__(self): def __iter__(self):
# Don't know how to factorize with _callback_thread_run r = self.threads.map_async(self._caller, self.backends)
responses = []
while not self.finish_event.isSet() or self.response_event.isSet():
self.response_event.wait()
with self.mutex:
responses = self.responses
self.responses = []
# Reset event while not r.ready() or not self.responses.empty():
self.response_event.clear() try:
yield self.responses.get(timeout=0.1)
except Queue.Empty:
continue
# Consume responses errors = get_errors(r)
while responses: if errors:
yield responses.pop(0) raise CallErrors(errors)
# Raise errors def get_errors(r):
with self.mutex: return [e for e in r.get() if e is not None]
if self.errors:
raise CallErrors(self.errors)