fix compatibility with python2.6 (don't use ThreadPool anymore)

This commit is contained in:
Romain Bignon 2014-05-01 18:35:11 +02:00
commit 711cba9c65

View file

@ -18,7 +18,6 @@
# along with weboob. If not, see <http://www.gnu.org/licenses/>. # along with weboob. If not, see <http://www.gnu.org/licenses/>.
from multiprocessing.pool import ThreadPool
from copy import copy from copy import copy
from threading import Thread from threading import Thread
import Queue import Queue
@ -53,21 +52,22 @@ class BackendsCall(object):
:type function: :class:`str` or :class:`callable` :type function: :class:`str` or :class:`callable`
""" """
self.logger = getLogger('bcall') self.logger = getLogger('bcall')
# Waiting responses
self.responses = Queue.Queue() self.responses = Queue.Queue()
# Threads self.errors = []
self.threads = ThreadPool(len(backends)) self.tasks = Queue.Queue()
self.backends = [(backend, function, args, kwargs) for backend in backends]
for backend in backends:
Thread(target=self.backend_process, args=(function, args, kwargs)).start()
self.tasks.put(backend)
def store_result(self, backend, result): def store_result(self, backend, result):
if isinstance(result, CapBaseObject): if isinstance(result, CapBaseObject):
result.backend = backend.name result.backend = backend.name
self.responses.put((backend, result)) self.responses.put((backend, result))
def _caller(self, args): def backend_process(self, function, args, kwargs):
return self.backend_thread(*args) backend = self.tasks.get()
def backend_thread(self, backend, function, args, kwargs):
with backend: with backend:
try: try:
# Call method on backend # Call method on backend
@ -79,7 +79,7 @@ class BackendsCall(object):
result = getattr(backend, function)(*args, **kwargs) result = getattr(backend, function)(*args, **kwargs)
except Exception as error: except Exception as error:
self.logger.debug('%s: Called function %s raised an error: %r' % (backend, function, error)) self.logger.debug('%s: Called function %s raised an error: %r' % (backend, function, error))
return backend, error, get_backtrace(error) self.errors.append((backend, error, get_backtrace(error)))
else: else:
self.logger.debug('%s: Called function %s returned: %r' % (backend, function, result)) self.logger.debug('%s: Called function %s returned: %r' % (backend, function, result))
@ -87,32 +87,24 @@ class BackendsCall(object):
# Loop on iterator # Loop on iterator
try: try:
for subresult in result: for subresult in result:
# Lock mutex only in loop in case the iterator is slow
# (for example if backend do some parsing operations)
self.store_result(backend, subresult) self.store_result(backend, subresult)
except Exception as error: except Exception as error:
return backend, error, get_backtrace(error) self.errors.append((backend, error, get_backtrace(error)))
else: else:
self.store_result(backend, result) self.store_result(backend, result)
finally: finally:
# This backend is now finished self.tasks.task_done()
#self.response_event.set()
#self.finish_event.set()
pass
def _callback_thread_run(self, callback, errback): def _callback_thread_run(self, callback, errback):
r = self.threads.map_async(self._caller, self.backends) while self.tasks.unfinished_tasks or not self.responses.empty():
while not r.ready() or not self.responses.empty():
try: try:
callback(*self.responses.get(timeout=0.1)) callback(*self.responses.get(timeout=0.1))
except Queue.Empty: except Queue.Empty:
continue continue
# Raise errors # Raise errors
errors = get_errors(r) while self.errors:
while errors: errback(*self.errors.pop(0))
errback(*errors.pop(0))
callback(None, None) callback(None, None)
@ -134,24 +126,17 @@ class BackendsCall(object):
return thread return thread
def wait(self): def wait(self):
r = self.threads.map(self._caller, self.backends) self.tasks.join()
errors = get_errors(r) if self.errors:
if errors: raise CallErrors(self.errors)
raise CallErrors(errors)
def __iter__(self): def __iter__(self):
r = self.threads.map_async(self._caller, self.backends) while self.tasks.unfinished_tasks or not self.responses.empty():
while not r.ready() or not self.responses.empty():
try: try:
yield self.responses.get(timeout=0.1) yield self.responses.get(timeout=0.1)
except Queue.Empty: except Queue.Empty:
continue continue
errors = get_errors(r) if self.errors:
if errors: raise CallErrors(self.errors)
raise CallErrors(errors)
def get_errors(r):
return [e for e in r.get() if e is not None]