add a way to asynchronously handle requests and pages

This commit is contained in:
smurail 2014-09-12 14:30:45 +02:00 committed by Romain Bignon
commit 902f26767d
5 changed files with 213 additions and 34 deletions

View file

@ -38,6 +38,7 @@ from weboob.tools.log import getLogger
from .cookies import WeboobCookieJar
from .exceptions import HTTPNotFound, ClientError, ServerError
from .sessions import FuturesSession
class Profile(object):
@ -146,6 +147,11 @@ class BaseBrowser(object):
MAX_RETRIES = 2
MAX_WORKERS = 10
"""
Maximum of threads for asynchronous requests.
"""
def __init__(self, logger=None, proxy=None, responses_dirname=None):
self.logger = getLogger('browser', logger)
self.PROXIES = proxy
@ -214,7 +220,7 @@ class BaseBrowser(object):
"""
Set up a python-requests session for our usage.
"""
session = requests.Session()
session = FuturesSession(max_workers=self.MAX_WORKERS)
session.proxies = self.PROXIES
@ -247,6 +253,7 @@ class BaseBrowser(object):
Other than that, has the exact same behavior of open().
"""
assert not kwargs.get('async'), "Please use open() instead of location() to make asynchronous requests."
response = self.open(url, **kwargs)
self.response = response
self.url = self.response.url
@ -260,6 +267,8 @@ class BaseBrowser(object):
cert=None,
proxies=None,
data_encoding=None,
async=False,
callback=lambda response: response,
**kwargs):
"""
Make an HTTP request like a browser does:
@ -279,6 +288,15 @@ class BaseBrowser(object):
Call this instead of location() if you do not want to "visit" the URL
(for instance, you are downloading a file).
When `async` is True, open() returns a Future objet (see
concurrent.futures for more details), which can be evaluated with its
result() method. If any exception is raised while processing request,
it is catched and re-raised when calling result().
For example:
>>> BaseBrowser().open('http://google.com', async=True).result().text # doctest: +SKIP
:param url: URL
:type url: str
@ -288,6 +306,13 @@ class BaseBrowser(object):
:param referrer: Force referrer. False to disable sending it, None for guessing
:type referrer: str or False or None
:param async: Process request in a non-blocking way
:type async: bool
:param callback: Callback to be called when request has finished,
with response as its first and only argument
:type callback: function
:rtype: :class:`requests.Response`
"""
req = self.build_request(url, referrer, data_encoding=data_encoding, **kwargs)
@ -308,6 +333,15 @@ class BaseBrowser(object):
if timeout is None:
timeout = self.TIMEOUT
# We define an inner_callback here in order to execute the same code
# regardless of async param.
def inner_callback(future, response):
if allow_redirects:
response = self.handle_refresh(response)
self.raise_for_status(response)
return callback(response)
# call python-requests
response = self.session.send(preq,
allow_redirects=allow_redirects,
@ -315,14 +349,21 @@ class BaseBrowser(object):
timeout=timeout,
verify=verify,
cert=cert,
proxies=proxies)
proxies=proxies,
background_callback=async and inner_callback)
if not async:
inner_callback(self, response)
if allow_redirects:
response = self.handle_refresh(response)
self.raise_for_status(response)
return response
def async_open(self, url, **kwargs):
"""
Shortcut to open(url, async=True).
"""
if 'async' in kwargs:
del kwargs['async']
return self.open(url, async=True, **kwargs)
def raise_for_status(self, response):
"""
Like Response.raise_for_status but will use other classes if needed.

View file

@ -105,9 +105,19 @@ class ListElement(AbstractElement):
def __iter__(self):
self.parse(self.el)
items = []
for el in self.find_elements():
for obj in self.handle_element(el):
if not self.flush_at_end:
for attrname in dir(self):
attr = getattr(self, attrname)
if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self):
item = attr(self.page, self, el)
item.handle_loaders()
items.append(item)
for item in items:
for obj in item:
obj = self.store(obj)
if obj and not self.flush_at_end:
yield obj
if self.flush_at_end:
@ -147,15 +157,6 @@ class ListElement(AbstractElement):
self.objects[obj.id] = obj
return obj
def handle_element(self, el):
for attrname in dir(self):
attr = getattr(self, attrname)
if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self):
for obj in attr(self.page, self, el):
obj = self.store(obj)
if obj:
yield obj
class SkipItem(Exception):
"""
@ -186,6 +187,7 @@ class ItemElement(AbstractElement):
__metaclass__ = _ItemElementMeta
_attrs = None
_loaders = None
klass = None
condition = None
validate = None
@ -197,6 +199,7 @@ class ItemElement(AbstractElement):
super(ItemElement, self).__init__(*args, **kwargs)
self.logger = getLogger(self.__class__.__name__.lower())
self.obj = None
self.loaders = {}
def build_object(self):
if self.klass is None:
@ -218,6 +221,7 @@ class ItemElement(AbstractElement):
if self.obj is None:
self.obj = self.build_object()
self.parse(self.el)
self.handle_loaders()
for attr in self._attrs:
self.handle_attr(attr, getattr(self, 'obj_%s' % attr))
except SkipItem:
@ -228,6 +232,17 @@ class ItemElement(AbstractElement):
yield self.obj
def handle_loaders(self):
for attrname in dir(self):
m = re.match('load_(.*)', attrname)
if not m:
continue
name = m.group(1)
if name in self.loaders:
continue
loader = getattr(self, attrname)
self.loaders[name] = self.use_selector(loader)
def handle_attr(self, key, func):
try:
value = self.use_selector(func)

View file

@ -29,6 +29,7 @@ from weboob.capabilities.base import empty
from weboob.tools.compat import basestring
from weboob.tools.exceptions import ParseError
from weboob.tools.html import html2text
from weboob.tools.browser2 import URL
class NoDefault(object):
def __repr__(self):
@ -41,7 +42,8 @@ __all__ = ['FilterError', 'ColumnNotFound', 'RegexpError', 'ItemNotFound',
'Filter', 'Base', 'Env', 'TableCell', 'CleanHTML', 'RawText',
'CleanText', 'Lower', 'CleanDecimal', 'Field', 'Regexp', 'Map',
'DateTime', 'Date', 'Time', 'DateGuesser', 'Duration',
'MultiFilter', 'CombineDate', 'Format', 'Join', 'Type']
'MultiFilter', 'CombineDate', 'Format', 'Join', 'Type',
'BrowserURL', 'Async', 'AsyncLoad']
class FilterError(ParseError):
@ -123,13 +125,37 @@ class Filter(_Filter):
class _Selector(Filter):
def filter(self, txt):
if txt is not None:
return txt
def filter(self, elements):
if elements is not None:
return elements
else:
return self.default_or_raise(ParseError('Element %r not found' % self.selector))
class AsyncLoad(Filter):
def __call__(self, item):
link = self.select(self.selector, item)
return item.page.browser.async_open(link)
class Async(_Filter):
def __init__(self, name, selector=None):
super(Async, self).__init__()
self.selector = selector
self.name = name
def __and__(self, o):
if isinstance(o, type) and issubclass(o, _Filter):
o = o()
self.selector = o
return self
def __call__(self, item):
result = item.loaders[self.name].result()
assert result.page is not None, 'The loaded url %s hasn\'t been matched by an URL object' % result.url
return self.selector(result.page.doc)
class Base(Filter):
"""
Change the base element used in filters.
@ -533,6 +559,22 @@ class Format(MultiFilter):
return self.fmt % values
class BrowserURL(MultiFilter):
def __init__(self, url_name, **kwargs):
super(BrowserURL, self).__init__(*kwargs.values())
self.url_name = url_name
self.keys = kwargs.keys()
def __call__(self, item):
values = super(BrowserURL, self).__call__(item)
url = getattr(item.page.browser, self.url_name)
assert isinstance(url, URL), "%s.%s must be an URL object" % (type(item.page.browser).__name__, self.url_name)
return url.build(**dict(zip(self.keys, values)))
def filter(self, values):
return values
class Join(Filter):
def __init__(self, pattern, selector=None, textCleaner=CleanText):
super(Join, self).__init__(selector)

View file

@ -283,20 +283,28 @@ class PagesBrowser(DomainBrowser):
response contains an attribute `page` if the url matches any
:class:`URL` object.
"""
response = super(PagesBrowser, self).open(*args, **kwargs)
response.page = None
# Try to handle the response page with an URL instance.
for url in self._urls.itervalues():
page = url.handle(response)
if page is not None:
self.logger.debug('Handle %s with %s' % (response.url, page.__class__.__name__))
response.page = page
break
callback = kwargs.pop('callback', lambda response: response)
if response.page is None:
self.logger.debug('Unable to handle %s' % response.url)
return response
# Have to define a callback to seamlessly process synchronous and
# asynchronous requests, see :meth:`BaseBrowser.open` and its `async`
# and `callback` params.
def internal_callback(response):
# Try to handle the response page with an URL instance.
response.page = None
for url in self._urls.itervalues():
page = url.handle(response)
if page is not None:
self.logger.debug('Handle %s with %s' % (response.url, page.__class__.__name__))
response.page = page
break
if response.page is None:
self.logger.debug('Unable to handle %s' % response.url)
return callback(response)
return super(PagesBrowser, self).open(callback=internal_callback, *args, **kwargs)
def location(self, *args, **kwargs):
"""

View file

@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
#
# Copyright(C) 2014 smurail
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
# Inspired by: https://github.com/ross/requests-futures/blob/master/requests_futures/sessions.py
# XXX Licence issues ?
from concurrent.futures import ThreadPoolExecutor
from requests import Session
from requests.adapters import DEFAULT_POOLSIZE, HTTPAdapter
class FuturesSession(Session):
def __init__(self, executor=None, max_workers=2, *args, **kwargs):
"""Creates a FuturesSession
Notes
~~~~~
* ProcessPoolExecutor is not supported b/c Response objects are
not picklable.
* If you provide both `executor` and `max_workers`, the latter is
ignored and provided executor is used as is.
"""
super(FuturesSession, self).__init__(*args, **kwargs)
if executor is None:
executor = ThreadPoolExecutor(max_workers=max_workers)
# set connection pool size equal to max_workers if needed
if max_workers > DEFAULT_POOLSIZE:
adapter_kwargs = dict(pool_connections=max_workers,
pool_maxsize=max_workers)
self.mount('https://', HTTPAdapter(**adapter_kwargs))
self.mount('http://', HTTPAdapter(**adapter_kwargs))
self.executor = executor
def send(self, *args, **kwargs):
"""Maintains the existing api for :meth:`Session.send`
Used by :meth:`request` and thus all of the higher level methods
If background_callback param is defined, request is processed in a
thread, calling background_callback and returning it's result when
request has been processed. If background_callback is not defined,
request is processed as usual, in a blocking way.
"""
sup = super(FuturesSession, self).send
background_callback = kwargs.pop('background_callback', None)
if background_callback:
def func(*args, **kwargs):
resp = sup(*args, **kwargs)
return background_callback(self, resp)
return self.executor.submit(func, *args, **kwargs)
return sup(*args, **kwargs)