From 902f26767daf7652675453e0d1c87d2f0f229738 Mon Sep 17 00:00:00 2001 From: smurail Date: Fri, 12 Sep 2014 14:30:45 +0200 Subject: [PATCH] add a way to asynchronously handle requests and pages --- weboob/tools/browser2/browser.py | 53 ++++++++++++++-- weboob/tools/browser2/elements.py | 37 ++++++++---- weboob/tools/browser2/filters/standard.py | 50 ++++++++++++++-- weboob/tools/browser2/page.py | 32 ++++++---- weboob/tools/browser2/sessions.py | 73 +++++++++++++++++++++++ 5 files changed, 212 insertions(+), 33 deletions(-) create mode 100644 weboob/tools/browser2/sessions.py diff --git a/weboob/tools/browser2/browser.py b/weboob/tools/browser2/browser.py index 3bf52b38..120d9a59 100644 --- a/weboob/tools/browser2/browser.py +++ b/weboob/tools/browser2/browser.py @@ -38,6 +38,7 @@ from weboob.tools.log import getLogger from .cookies import WeboobCookieJar from .exceptions import HTTPNotFound, ClientError, ServerError +from .sessions import FuturesSession class Profile(object): @@ -146,6 +147,11 @@ class BaseBrowser(object): MAX_RETRIES = 2 + MAX_WORKERS = 10 + """ + Maximum of threads for asynchronous requests. + """ + def __init__(self, logger=None, proxy=None, responses_dirname=None): self.logger = getLogger('browser', logger) self.PROXIES = proxy @@ -214,7 +220,7 @@ class BaseBrowser(object): """ Set up a python-requests session for our usage. """ - session = requests.Session() + session = FuturesSession(max_workers=self.MAX_WORKERS) session.proxies = self.PROXIES @@ -247,6 +253,7 @@ class BaseBrowser(object): Other than that, has the exact same behavior of open(). """ + assert not kwargs.get('async'), "Please use open() instead of location() to make asynchronous requests." response = self.open(url, **kwargs) self.response = response self.url = self.response.url @@ -260,6 +267,8 @@ class BaseBrowser(object): cert=None, proxies=None, data_encoding=None, + async=False, + callback=lambda response: response, **kwargs): """ Make an HTTP request like a browser does: @@ -279,6 +288,15 @@ class BaseBrowser(object): Call this instead of location() if you do not want to "visit" the URL (for instance, you are downloading a file). + When `async` is True, open() returns a Future objet (see + concurrent.futures for more details), which can be evaluated with its + result() method. If any exception is raised while processing request, + it is catched and re-raised when calling result(). + + For example: + + >>> BaseBrowser().open('http://google.com', async=True).result().text # doctest: +SKIP + :param url: URL :type url: str @@ -288,6 +306,13 @@ class BaseBrowser(object): :param referrer: Force referrer. False to disable sending it, None for guessing :type referrer: str or False or None + :param async: Process request in a non-blocking way + :type async: bool + + :param callback: Callback to be called when request has finished, + with response as its first and only argument + :type callback: function + :rtype: :class:`requests.Response` """ req = self.build_request(url, referrer, data_encoding=data_encoding, **kwargs) @@ -308,6 +333,15 @@ class BaseBrowser(object): if timeout is None: timeout = self.TIMEOUT + # We define an inner_callback here in order to execute the same code + # regardless of async param. + def inner_callback(future, response): + if allow_redirects: + response = self.handle_refresh(response) + + self.raise_for_status(response) + return callback(response) + # call python-requests response = self.session.send(preq, allow_redirects=allow_redirects, @@ -315,14 +349,21 @@ class BaseBrowser(object): timeout=timeout, verify=verify, cert=cert, - proxies=proxies) + proxies=proxies, + background_callback=async and inner_callback) + if not async: + inner_callback(self, response) - if allow_redirects: - response = self.handle_refresh(response) - - self.raise_for_status(response) return response + def async_open(self, url, **kwargs): + """ + Shortcut to open(url, async=True). + """ + if 'async' in kwargs: + del kwargs['async'] + return self.open(url, async=True, **kwargs) + def raise_for_status(self, response): """ Like Response.raise_for_status but will use other classes if needed. diff --git a/weboob/tools/browser2/elements.py b/weboob/tools/browser2/elements.py index a2e55206..e63a3b0a 100644 --- a/weboob/tools/browser2/elements.py +++ b/weboob/tools/browser2/elements.py @@ -105,9 +105,19 @@ class ListElement(AbstractElement): def __iter__(self): self.parse(self.el) + items = [] for el in self.find_elements(): - for obj in self.handle_element(el): - if not self.flush_at_end: + for attrname in dir(self): + attr = getattr(self, attrname) + if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self): + item = attr(self.page, self, el) + item.handle_loaders() + items.append(item) + + for item in items: + for obj in item: + obj = self.store(obj) + if obj and not self.flush_at_end: yield obj if self.flush_at_end: @@ -147,15 +157,6 @@ class ListElement(AbstractElement): self.objects[obj.id] = obj return obj - def handle_element(self, el): - for attrname in dir(self): - attr = getattr(self, attrname) - if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self): - for obj in attr(self.page, self, el): - obj = self.store(obj) - if obj: - yield obj - class SkipItem(Exception): """ @@ -186,6 +187,7 @@ class ItemElement(AbstractElement): __metaclass__ = _ItemElementMeta _attrs = None + _loaders = None klass = None condition = None validate = None @@ -197,6 +199,7 @@ class ItemElement(AbstractElement): super(ItemElement, self).__init__(*args, **kwargs) self.logger = getLogger(self.__class__.__name__.lower()) self.obj = None + self.loaders = {} def build_object(self): if self.klass is None: @@ -218,6 +221,7 @@ class ItemElement(AbstractElement): if self.obj is None: self.obj = self.build_object() self.parse(self.el) + self.handle_loaders() for attr in self._attrs: self.handle_attr(attr, getattr(self, 'obj_%s' % attr)) except SkipItem: @@ -228,6 +232,17 @@ class ItemElement(AbstractElement): yield self.obj + def handle_loaders(self): + for attrname in dir(self): + m = re.match('load_(.*)', attrname) + if not m: + continue + name = m.group(1) + if name in self.loaders: + continue + loader = getattr(self, attrname) + self.loaders[name] = self.use_selector(loader) + def handle_attr(self, key, func): try: value = self.use_selector(func) diff --git a/weboob/tools/browser2/filters/standard.py b/weboob/tools/browser2/filters/standard.py index 28b1a1fe..433841d2 100644 --- a/weboob/tools/browser2/filters/standard.py +++ b/weboob/tools/browser2/filters/standard.py @@ -29,6 +29,7 @@ from weboob.capabilities.base import empty from weboob.tools.compat import basestring from weboob.tools.exceptions import ParseError from weboob.tools.html import html2text +from weboob.tools.browser2 import URL class NoDefault(object): def __repr__(self): @@ -41,7 +42,8 @@ __all__ = ['FilterError', 'ColumnNotFound', 'RegexpError', 'ItemNotFound', 'Filter', 'Base', 'Env', 'TableCell', 'CleanHTML', 'RawText', 'CleanText', 'Lower', 'CleanDecimal', 'Field', 'Regexp', 'Map', 'DateTime', 'Date', 'Time', 'DateGuesser', 'Duration', - 'MultiFilter', 'CombineDate', 'Format', 'Join', 'Type'] + 'MultiFilter', 'CombineDate', 'Format', 'Join', 'Type', + 'BrowserURL', 'Async', 'AsyncLoad'] class FilterError(ParseError): @@ -123,13 +125,37 @@ class Filter(_Filter): class _Selector(Filter): - def filter(self, txt): - if txt is not None: - return txt + def filter(self, elements): + if elements is not None: + return elements else: return self.default_or_raise(ParseError('Element %r not found' % self.selector)) +class AsyncLoad(Filter): + def __call__(self, item): + link = self.select(self.selector, item) + return item.page.browser.async_open(link) + + +class Async(_Filter): + def __init__(self, name, selector=None): + super(Async, self).__init__() + self.selector = selector + self.name = name + + def __and__(self, o): + if isinstance(o, type) and issubclass(o, _Filter): + o = o() + self.selector = o + return self + + def __call__(self, item): + result = item.loaders[self.name].result() + assert result.page is not None, 'The loaded url %s hasn\'t been matched by an URL object' % result.url + return self.selector(result.page.doc) + + class Base(Filter): """ Change the base element used in filters. @@ -533,6 +559,22 @@ class Format(MultiFilter): return self.fmt % values +class BrowserURL(MultiFilter): + def __init__(self, url_name, **kwargs): + super(BrowserURL, self).__init__(*kwargs.values()) + self.url_name = url_name + self.keys = kwargs.keys() + + def __call__(self, item): + values = super(BrowserURL, self).__call__(item) + url = getattr(item.page.browser, self.url_name) + assert isinstance(url, URL), "%s.%s must be an URL object" % (type(item.page.browser).__name__, self.url_name) + return url.build(**dict(zip(self.keys, values))) + + def filter(self, values): + return values + + class Join(Filter): def __init__(self, pattern, selector=None, textCleaner=CleanText): super(Join, self).__init__(selector) diff --git a/weboob/tools/browser2/page.py b/weboob/tools/browser2/page.py index 9de64e46..61f45096 100644 --- a/weboob/tools/browser2/page.py +++ b/weboob/tools/browser2/page.py @@ -283,20 +283,28 @@ class PagesBrowser(DomainBrowser): response contains an attribute `page` if the url matches any :class:`URL` object. """ - response = super(PagesBrowser, self).open(*args, **kwargs) - response.page = None - # Try to handle the response page with an URL instance. - for url in self._urls.itervalues(): - page = url.handle(response) - if page is not None: - self.logger.debug('Handle %s with %s' % (response.url, page.__class__.__name__)) - response.page = page - break + callback = kwargs.pop('callback', lambda response: response) - if response.page is None: - self.logger.debug('Unable to handle %s' % response.url) - return response + # Have to define a callback to seamlessly process synchronous and + # asynchronous requests, see :meth:`BaseBrowser.open` and its `async` + # and `callback` params. + def internal_callback(response): + # Try to handle the response page with an URL instance. + response.page = None + for url in self._urls.itervalues(): + page = url.handle(response) + if page is not None: + self.logger.debug('Handle %s with %s' % (response.url, page.__class__.__name__)) + response.page = page + break + + if response.page is None: + self.logger.debug('Unable to handle %s' % response.url) + + return callback(response) + + return super(PagesBrowser, self).open(callback=internal_callback, *args, **kwargs) def location(self, *args, **kwargs): """ diff --git a/weboob/tools/browser2/sessions.py b/weboob/tools/browser2/sessions.py new file mode 100644 index 00000000..65f8408f --- /dev/null +++ b/weboob/tools/browser2/sessions.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# +# Copyright(C) 2014 smurail +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +# Inspired by: https://github.com/ross/requests-futures/blob/master/requests_futures/sessions.py +# XXX Licence issues ? + +from concurrent.futures import ThreadPoolExecutor +from requests import Session +from requests.adapters import DEFAULT_POOLSIZE, HTTPAdapter + +class FuturesSession(Session): + + def __init__(self, executor=None, max_workers=2, *args, **kwargs): + """Creates a FuturesSession + + Notes + ~~~~~ + + * ProcessPoolExecutor is not supported b/c Response objects are + not picklable. + + * If you provide both `executor` and `max_workers`, the latter is + ignored and provided executor is used as is. + """ + super(FuturesSession, self).__init__(*args, **kwargs) + if executor is None: + executor = ThreadPoolExecutor(max_workers=max_workers) + # set connection pool size equal to max_workers if needed + if max_workers > DEFAULT_POOLSIZE: + adapter_kwargs = dict(pool_connections=max_workers, + pool_maxsize=max_workers) + self.mount('https://', HTTPAdapter(**adapter_kwargs)) + self.mount('http://', HTTPAdapter(**adapter_kwargs)) + + self.executor = executor + + def send(self, *args, **kwargs): + """Maintains the existing api for :meth:`Session.send` + + Used by :meth:`request` and thus all of the higher level methods + + If background_callback param is defined, request is processed in a + thread, calling background_callback and returning it's result when + request has been processed. If background_callback is not defined, + request is processed as usual, in a blocking way. + """ + sup = super(FuturesSession, self).send + + background_callback = kwargs.pop('background_callback', None) + if background_callback: + def func(*args, **kwargs): + resp = sup(*args, **kwargs) + return background_callback(self, resp) + + return self.executor.submit(func, *args, **kwargs) + + return sup(*args, **kwargs)