From ad4fc17a894c9e65cbe663b5955877688c332b95 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Tue, 27 Apr 2010 00:59:13 +0200 Subject: [PATCH] new BackendsCall class to do asynchronous calls to backends methods --- weboob/bcall.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 weboob/bcall.py diff --git a/weboob/bcall.py b/weboob/bcall.py new file mode 100644 index 00000000..27a2afd0 --- /dev/null +++ b/weboob/bcall.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- + +""" +Copyright(C) 2010 Romain Bignon + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + +""" + +from __future__ import with_statement + +from logging import debug +from threading import Thread, Event, RLock, Timer + +__all__ = ['BackendsCall'] + +class Result(object): + def __init__(self, backend, result): + self.backend = backend + self.result = result + + def __iter__(self): + """ + To allow unpack. + + For example: + >>> for backend, result in self.weboob.do(blabla) + """ + yield self.backend + yield self.result + +class BackendsCall(object): + def __init__(self, backends, function, *args, **kwargs): + # Store if a backend is finished + self.backends = {} + for backend in backends: + self.backends[backend.name] = False + # Global mutex on object + self.mutex = RLock() + # Event set when every backends have give their data + self.finish_event = Event() + # Event set when there are new responses + self.response_event = Event() + # Waiting responses + self.responses = [] + # Timers + self.threads = [] + + # Create jobs for each backend + with self.mutex: + for b in backends: + debug('New timer for %s' % b) + self.threads.append(Timer(0, self.caller, (b, function, args, kwargs)).start()) + + def caller(self, b, function, args, kwargs): + debug('Hello from timer %s' % b) + with b: + try: + # Call method on backend + r = getattr(b, function)(*args, **kwargs) + debug('%s: Got answer! %s' % (b, r)) + + if hasattr(r, '__iter__'): + # Loop on iterator + for e in r: + # Lock mutex only in loop in case the iterator is slow + # (for example if backend do some parsing operations) + with self.mutex: + self.responses.append((b,e)) + self.response_event.set() + else: + with self.mutex: + self.responses.append((b,r)) + self.response_event.set() + finally: + with self.mutex: + # This backend is now finished + self.backends[b.name] = True + for finished in self.backends.itervalues(): + if not finished: + return + self.finish_event.set() + self.response_event.set() + + def _callback_thread_run(self, function): + responses = [] + while not self.finish_event.isSet(): + self.response_event.wait() + with self.mutex: + responses = self.responses + self.responses = [] + + # Reset event + self.response_event.clear() + + # Consume responses + while responses: + function(*responses.pop()) + + def callback_thread(self, function): + """ + Call this method to create a thread which will callback a + specified function everytimes a new result comes. + + When the process is over, the function will be called with + both arguments set to None. + + The function prototype is: + def function(backend, result) + + """ + return Thread(target=self._callback_thread_run, args=(function,)) + + def __iter__(self): + # Don't know how to factorize with _callback_thread_run + responses = [] + while not self.finish_event.isSet(): + self.response_event.wait() + with self.mutex: + responses = self.responses + self.responses = [] + + # Reset event + self.response_event.clear() + + # Consume responses + while responses: + yield Result(*responses.pop())