Move browser2 from weboob/tools/b2 to weboob/browser2

This commit is contained in:
Florent 2014-10-02 11:01:34 +02:00
commit a019819f9d
90 changed files with 211 additions and 211 deletions

View file

@ -1,25 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2012 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .browser import Browser, DomainBrowser, Wget, Firefox, UrlNotAllowed, Profile
from .page import PagesBrowser, Page, URL, HTMLPage, LoginBrowser, need_login, JsonPage, LoggedPage, XMLPage
__all__ = ['Browser', 'DomainBrowser', 'Wget', 'Firefox', 'UrlNotAllowed', 'Profile', 'XMLPage',
'PagesBrowser', 'Page', 'URL', 'HTMLPage', 'LoginBrowser', 'need_login', 'JsonPage', 'LoggedPage']

View file

@ -1,581 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2012-2014 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import, print_function
import re
try:
from urllib.parse import urlparse, urljoin
except ImportError:
from urlparse import urlparse, urljoin
import os
import sys
try:
import requests
if int(requests.__version__.split('.')[0]) < 2:
raise ImportError()
except ImportError:
raise ImportError('Please install python-requests >= 2.0')
from weboob.tools.log import getLogger
from .cookies import WeboobCookieJar
from .exceptions import HTTPNotFound, ClientError, ServerError
from .sessions import FuturesSession
class Profile(object):
"""
A profile represents the way Browser should act.
Usually it is to mimic a real browser.
"""
def setup_session(self, session):
"""
Change default headers, set up hooks, etc.
Warning: Do not enable lzma, bzip or bzip2, sdch encodings
as python-requests does not support it yet.
Supported as of 2.2: gzip, deflate, compress.
In doubt, do not change the default Accept-Encoding header
of python-requests.
"""
raise NotImplementedError()
class Weboob(Profile):
"""
It's us!
Recommended for Weboob-friendly websites only.
"""
def __init__(self, version):
self.version = version
def setup_session(self, session):
session.headers['User-Agent'] = 'weboob/%s' % self.version
class Firefox(Profile):
"""
Try to mimic a specific version of Firefox.
Ideally, it should follow the current ESR Firefox:
https://www.mozilla.org/en-US/firefox/organizations/all.html
Do not change the Firefox version without checking the Gecko one!
"""
def setup_session(self, session):
"""
Set up headers for a standard Firefox request
(except for DNT which isn't on by default but is a good idea).
The goal is to be unidentifiable.
"""
# Replace all base requests headers
# https://developer.mozilla.org/en/Gecko_user_agent_string_reference
# https://bugzilla.mozilla.org/show_bug.cgi?id=572650
session.headers = {
'Accept-Language': 'en-us,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Firefox/31.0',
'DNT': '1'}
class Wget(Profile):
"""
Common alternative user agent.
Some websites will give you a version with less JavaScript.
Some others could ban you (after all, wget is not a real browser).
"""
def __init__(self, version='1.11.4'):
self.version = version
def setup_session(self, session):
# Don't remove base headers, if websites want to block fake browsers,
# they will probably block any wget user agent anyway.
session.headers.update({
'Accept': '*/*',
'User-Agent': 'Wget/%s' % self.version})
class Browser(object):
"""
Simple browser class.
Act like a browser, and don't try to do too much.
"""
PROFILE = Firefox()
"""
Default profile used by browser to navigate on websites.
"""
TIMEOUT = 10.0
"""
Default timeout during requests.
"""
REFRESH_MAX = 0.0
"""
When handling a Refresh header, the browsers considers it only if the sleep
time in lesser than this value.
"""
VERIFY = True
"""
Check SSL certificates.
"""
PROXIES = None
MAX_RETRIES = 2
MAX_WORKERS = 10
"""
Maximum of threads for asynchronous requests.
"""
def __init__(self, logger=None, proxy=None, responses_dirname=None):
self.logger = getLogger('browser', logger)
self.PROXIES = proxy
self._setup_session(self.PROFILE)
self.url = None
self.response = None
self.responses_dirname = responses_dirname
self.responses_count = 1
def _save(self, response, warning=False, **kwargs):
if self.responses_dirname is None:
import tempfile
self.responses_dirname = tempfile.mkdtemp(prefix='weboob_session_')
print('Debug data will be saved in this directory: %s' % self.responses_dirname, file=sys.stderr)
elif not os.path.isdir(self.responses_dirname):
os.makedirs(self.responses_dirname)
import mimetypes
# get the content-type, remove optionnal charset part
mimetype = response.headers.get('Content-Type', '').split(';')[0]
# due to http://bugs.python.org/issue1043134
if mimetype == 'text/plain':
ext = '.txt'
else:
# try to get an extension (and avoid adding 'None')
ext = mimetypes.guess_extension(mimetype, False) or ''
path = re.sub(r'[^A-z0-9\.-_]+', '_', urlparse(response.url).path.rpartition('/')[2])[-10:]
if path.endswith(ext):
ext = ''
filename = '%02d-%d%s%s%s' % \
(self.responses_count, response.status_code, '-' if path else '', path, ext)
response_filepath = os.path.join(self.responses_dirname, filename)
with open(response_filepath, 'w') as f:
f.write(response.content)
request = response.request
with open(response_filepath + '-request.txt', 'w') as f:
f.write('%s %s\n\n\n' % (request.method, request.url))
for key, value in request.headers.iteritems():
f.write('%s: %s\n' % (key, value))
if request.body is not None: # separate '' from None
f.write('\n\n\n%s' % request.body)
with open(response_filepath + '-response.txt', 'w') as f:
if hasattr(response.elapsed, 'total_seconds'):
f.write('Time: %3.3fs\n' % response.elapsed.total_seconds())
f.write('%s %s\n\n\n' % (response.status_code, response.reason))
for key, value in response.headers.iteritems():
f.write('%s: %s\n' % (key, value))
match_filepath = os.path.join(self.responses_dirname, 'url_response_match.txt')
with open(match_filepath, 'a') as f:
f.write('# %d %s %s\n' % (response.status_code, response.reason, response.headers.get('Content-Type', '')))
f.write('%s\t%s\n' % (response.url, filename))
self.responses_count += 1
msg = u'Response saved to %s' % response_filepath
if warning:
self.logger.warning(msg)
else:
self.logger.info(msg)
def _setup_session(self, profile):
"""
Set up a python-requests session for our usage.
"""
session = FuturesSession(max_workers=self.MAX_WORKERS)
session.proxies = self.PROXIES
session.verify = self.VERIFY and not self.logger.settings['ssl_insecure']
# defines a max_retries. It's mandatory in case a server is not
# handling keep alive correctly, like the proxy burp
a = requests.adapters.HTTPAdapter(max_retries=self.MAX_RETRIES)
session.mount('http://', a)
session.mount('https://', a)
if self.TIMEOUT:
session.timeout = self.TIMEOUT
## weboob only can provide proxy and HTTP auth options
session.trust_env = False
profile.setup_session(session)
if self.logger.settings['save_responses']:
session.hooks['response'].append(self._save)
self.session = session
session.cookies = WeboobCookieJar()
def location(self, url, **kwargs):
"""
Like :meth:`open` but also changes the current URL and response.
This is the most common method to request web pages.
Other than that, has the exact same behavior of open().
"""
assert not kwargs.get('async'), "Please use open() instead of location() to make asynchronous requests."
response = self.open(url, **kwargs)
self.response = response
self.url = self.response.url
return response
def open(self, url, referrer=None,
allow_redirects=True,
stream=None,
timeout=None,
verify=None,
cert=None,
proxies=None,
data_encoding=None,
async=False,
callback=lambda response: response,
**kwargs):
"""
Make an HTTP request like a browser does:
* follow redirects (unless disabled)
* provide referrers (unless disabled)
Unless a `method` is explicitly provided, it makes a GET request,
or a POST if data is not None,
An empty `data` (not None, like '' or {}) *will* make a POST.
It is a wrapper around session.request().
All session.request() options are available.
You should use location() or open() and not session.request(),
since it has some interesting additions, which are easily
individually disabled through the arguments.
Call this instead of location() if you do not want to "visit" the URL
(for instance, you are downloading a file).
When `async` is True, open() returns a Future objet (see
concurrent.futures for more details), which can be evaluated with its
result() method. If any exception is raised while processing request,
it is catched and re-raised when calling result().
For example:
>>> Browser().open('http://google.com', async=True).result().text # doctest: +SKIP
:param url: URL
:type url: str
:param data: POST data
:type url: str or dict or None
:param referrer: Force referrer. False to disable sending it, None for guessing
:type referrer: str or False or None
:param async: Process request in a non-blocking way
:type async: bool
:param callback: Callback to be called when request has finished,
with response as its first and only argument
:type callback: function
:rtype: :class:`requests.Response`
"""
req = self.build_request(url, referrer, data_encoding=data_encoding, **kwargs)
preq = self.prepare_request(req)
if hasattr(preq, '_cookies'):
# The _cookies attribute is not present in requests < 2.2. As in
# previous version it doesn't calls extract_cookies_to_jar(), it is
# not a problem as we keep our own cookiejar instance.
preq._cookies = WeboobCookieJar.from_cookiejar(preq._cookies)
if proxies is None:
proxies = self.PROXIES
if verify is None:
verify = self.VERIFY and not self.logger.settings['ssl_insecure']
if timeout is None:
timeout = self.TIMEOUT
# We define an inner_callback here in order to execute the same code
# regardless of async param.
def inner_callback(future, response):
if allow_redirects:
response = self.handle_refresh(response)
self.raise_for_status(response)
return callback(response)
# call python-requests
response = self.session.send(preq,
allow_redirects=allow_redirects,
stream=stream,
timeout=timeout,
verify=verify,
cert=cert,
proxies=proxies,
background_callback=async and inner_callback)
if not async:
inner_callback(self, response)
return response
def async_open(self, url, **kwargs):
"""
Shortcut to open(url, async=True).
"""
if 'async' in kwargs:
del kwargs['async']
return self.open(url, async=True, **kwargs)
def raise_for_status(self, response):
"""
Like Response.raise_for_status but will use other classes if needed.
"""
http_error_msg = None
if 400 <= response.status_code < 500:
http_error_msg = '%s Client Error: %s' % (response.status_code, response.reason)
cls = ClientError
if response.status_code == 404:
cls = HTTPNotFound
elif 500 <= response.status_code < 600:
http_error_msg = '%s Server Error: %s' % (response.status_code, response.reason)
cls = ServerError
if http_error_msg:
raise cls(http_error_msg, response=response)
# in case we did not catch something that should be
response.raise_for_status()
def build_request(self, url, referrer=None, data_encoding=None, **kwargs):
"""
Does the same job as open(), but returns a Request without
submitting it.
This allows further customization to the Request.
"""
if isinstance(url, requests.Request):
req = url
url = req.url
else:
req = requests.Request(url=url, **kwargs)
# guess method
if req.method is None:
if req.data:
req.method = 'POST'
else:
req.method = 'GET'
# convert unicode strings to proper encoding
if isinstance(req.data, unicode) and data_encoding:
req.data = req.data.encode(data_encoding)
if isinstance(req.data, dict) and data_encoding:
req.data = dict([(k, v.encode(data_encoding) if isinstance(v, unicode) else v)
for k, v in req.data.iteritems()])
if referrer is None:
referrer = self.get_referrer(self.url, url)
if referrer:
# Yes, it is a misspelling.
req.headers.setdefault('Referer', referrer)
return req
def prepare_request(self, req):
"""
Get a prepared request from a Request object.
This method aims to be overloaded by children classes.
"""
return self.session.prepare_request(req)
REFRESH_RE = re.compile(r"^(?P<sleep>[\d\.]+)(; url=[\"']?(?P<url>.*?)[\"']?)?$", re.IGNORECASE)
def handle_refresh(self, response):
"""
Called by open, to handle Refresh HTTP header.
It only redirect to the refresh URL if the sleep time is inferior to
REFRESH_MAX.
"""
if not 'Refresh' in response.headers:
return response
m = self.REFRESH_RE.match(response.headers['Refresh'])
if m:
# XXX perhaps we should not redirect if the refresh url is equal to the current url.
url = m.groupdict().get('url', None) or response.request.url
sleep = float(m.groupdict()['sleep'])
if sleep <= self.REFRESH_MAX:
self.logger.debug('Refresh to %s' % url)
return self.open(url)
else:
self.logger.debug('Do not refresh to %s because %s > REFRESH_MAX(%s)' % (url, sleep, self.REFRESH_MAX))
return response
self.logger.warning('Unable to handle refresh "%s"' % response.headers['Refresh'])
return response
def get_referrer(self, oldurl, newurl):
"""
Get the referrer to send when doing a request.
If we should not send a referrer, it will return None.
Reference: https://en.wikipedia.org/wiki/HTTP_referer
:param oldurl: Current absolute URL
:type oldurl: str or None
:param newurl: Target absolute URL
:type newurl: str
:rtype: str or None
"""
if oldurl is None:
return None
old = urlparse(oldurl)
new = urlparse(newurl)
# Do not leak secure URLs to insecure URLs
if old.scheme == 'https' and new.scheme != 'https':
return None
# Reloading the page. Usually no referrer.
if oldurl == newurl:
return None
# TODO maybe implement some *optional* privacy features:
# * do not leak referrer to other domains (often breaks websites)
# * send a fake referrer (root of the current domain)
# * never send the referrer
# Inspired by the RefControl Firefox addon.
return oldurl
class UrlNotAllowed(Exception):
"""
Raises by :class:`DomainBrowser` when `RESTRICT_URL` is set and trying to go
on an url not matching `BASEURL`.
"""
class DomainBrowser(Browser):
"""
A browser that handles relative URLs and can have a base URL (usually a domain).
For instance self.location('/hello') will get http://weboob.org/hello
if BASEURL is 'http://weboob.org/'.
"""
BASEURL = None
"""
Base URL, e.g. 'http://weboob.org/' or 'https://weboob.org/'
See absurl().
"""
RESTRICT_URL = False
"""
URLs allowed to load.
This can be used to force SSL (if the BASEURL is SSL) or any other leakage.
Set to True to allow only URLs starting by the BASEURL.
Set it to a list of allowed URLs if you have multiple allowed URLs.
More complex behavior is possible by overloading url_allowed()
"""
def url_allowed(self, url):
"""
Checks if we are allowed to visit an URL.
See RESTRICT_URL.
:param url: Absolute URL
:type url: str
:rtype: bool
"""
if self.BASEURL is None or self.RESTRICT_URL is False:
return True
if self.RESTRICT_URL is True:
return url.startswith(self.BASEURL)
for restrict_url in self.RESTRICT_URL:
if url.startswith(restrict_url):
return True
return False
def absurl(self, uri, base=None):
"""
Get the absolute URL, relative to the base URL.
If BASEURL is None, it will try to use the current URL.
If base is False, it will always try to use the current URL.
:param uri: URI to make absolute. It can be already absolute.
:type uri: str
:param base: Base absolute URL.
:type base: str or None or False
:rtype: str
"""
if not base:
base = self.url
if base is None or base is True:
base = self.BASEURL
return urljoin(base, uri)
def open(self, req, *args, **kwargs):
"""
Like :meth:`Browser.open` but hanldes urls without domains, using
the :attr:`BASEURL` attribute.
"""
uri = req.url if isinstance(req, requests.Request) else req
url = self.absurl(uri)
if not self.url_allowed(url):
raise UrlNotAllowed(url)
if isinstance(req, requests.Request):
req.url = url
else:
req = url
return super(DomainBrowser, self).open(req, *args, **kwargs)
def go_home(self):
"""
Go to the "home" page, usually the BASEURL.
"""
return self.location(self.BASEURL or self.absurl('/'))

View file

@ -1,72 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import requests.cookies
try:
import cookielib
except ImportError:
import http.cookiejar as cookielib
__all__ = ['WeboobCookieJar']
class WeboobCookieJar(requests.cookies.RequestsCookieJar):
@classmethod
def from_cookiejar(klass, cj):
"""
Create a WeboobCookieJar from another CookieJar instance.
"""
return requests.cookies.merge_cookies(klass(), cj)
def export(self, filename):
"""
Export all cookies to a file, regardless of expiration, etc.
"""
cj = requests.cookies.merge_cookies(cookielib.LWPCookieJar(), self)
cj.save(filename, ignore_discard=True, ignore_expires=True)
def _cookies_from_attrs_set(self, attrs_set, request):
for tup in self._normalized_cookie_tuples(attrs_set):
cookie = self._cookie_from_cookie_tuple(tup, request)
if cookie:
yield cookie
def make_cookies(self, response, request):
"""Return sequence of Cookie objects extracted from response object."""
# get cookie-attributes for RFC 2965 and Netscape protocols
headers = response.info()
rfc2965_hdrs = headers.getheaders("Set-Cookie2")
ns_hdrs = headers.getheaders("Set-Cookie")
rfc2965 = self._policy.rfc2965
netscape = self._policy.netscape
if netscape:
for cookie in self._cookies_from_attrs_set(cookielib.parse_ns_headers(ns_hdrs), request):
self._process_rfc2109_cookies([cookie])
yield cookie
if rfc2965:
for cookie in self._cookies_from_attrs_set(cookielib.split_header_words(rfc2965_hdrs), request):
yield cookie
def copy(self):
new_cj = type(self)()
new_cj.update(self)
return new_cj

View file

@ -1,292 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
import sys
from copy import deepcopy
from weboob.tools.log import getLogger
from weboob.tools.ordereddict import OrderedDict
from weboob.tools.browser2.page import NextPage
from .filters.standard import _Filter, CleanText
from .filters.html import AttributeNotFound, XPathNotFound
__all__ = ['DataError', 'AbstractElement', 'ListElement', 'ItemElement', 'TableElement', 'SkipItem']
class DataError(Exception):
"""
Returned data from pages are incoherent.
"""
class AbstractElement(object):
_creation_counter = 0
def __init__(self, page, parent=None, el=None):
self.page = page
self.parent = parent
if el is not None:
self.el = el
elif parent is not None:
self.el = parent.el
else:
self.el = page.doc
if parent is not None:
self.env = deepcopy(parent.env)
else:
self.env = deepcopy(page.params)
# Used by debug
self._random_id = AbstractElement._creation_counter
AbstractElement._creation_counter += 1
self.loaders = {}
def use_selector(self, func, key=None):
if isinstance(func, _Filter):
func._obj = self
func._key = key
value = func(self)
elif callable(func):
value = func()
else:
value = deepcopy(func)
return value
def parse(self, obj):
pass
def cssselect(self, *args, **kwargs):
return self.el.cssselect(*args, **kwargs)
def xpath(self, *args, **kwargs):
return self.el.xpath(*args, **kwargs)
def handle_loaders(self):
for attrname in dir(self):
m = re.match('load_(.*)', attrname)
if not m:
continue
name = m.group(1)
if name in self.loaders:
continue
loader = getattr(self, attrname)
self.loaders[name] = self.use_selector(loader, key=attrname)
class ListElement(AbstractElement):
item_xpath = None
flush_at_end = False
ignore_duplicate = False
def __init__(self, *args, **kwargs):
super(ListElement, self).__init__(*args, **kwargs)
self.logger = getLogger(self.__class__.__name__.lower())
self.objects = OrderedDict()
def __call__(self, *args, **kwargs):
for key, value in kwargs.iteritems():
self.env[key] = value
return self.__iter__()
def find_elements(self):
"""
Get the nodes that will have to be processed.
This method can be overridden if xpath filters are not
sufficient.
"""
if self.item_xpath is not None:
for el in self.el.xpath(self.item_xpath):
yield el
else:
yield self.el
def __iter__(self):
self.parse(self.el)
items = []
for el in self.find_elements():
for attrname in dir(self):
attr = getattr(self, attrname)
if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self):
item = attr(self.page, self, el)
item.handle_loaders()
items.append(item)
for item in items:
for obj in item:
obj = self.store(obj)
if obj and not self.flush_at_end:
yield obj
if self.flush_at_end:
for obj in self.flush():
yield obj
self.check_next_page()
def flush(self):
for obj in self.objects.itervalues():
yield obj
def check_next_page(self):
if not hasattr(self, 'next_page'):
return
next_page = getattr(self, 'next_page')
try:
value = self.use_selector(next_page)
except (AttributeNotFound, XPathNotFound):
return
if value is None:
return
raise NextPage(value)
def store(self, obj):
if obj.id:
if obj.id in self.objects:
if self.ignore_duplicate:
self.logger.warning('There are two objects with the same ID! %s' % obj.id)
return
else:
raise DataError('There are two objects with the same ID! %s' % obj.id)
self.objects[obj.id] = obj
return obj
class SkipItem(Exception):
"""
Raise this exception in an :class:`ItemElement` subclass to skip an item.
"""
class _ItemElementMeta(type):
"""
Private meta-class used to keep order of obj_* attributes in :class:`ItemElement`.
"""
def __new__(mcs, name, bases, attrs):
_attrs = []
for base in bases:
if hasattr(base, '_attrs'):
_attrs += base._attrs
filters = [(re.sub('^obj_', '', attr_name), attrs[attr_name]) for attr_name, obj in attrs.items() if attr_name.startswith('obj_')]
# constants first, then filters, then methods
filters.sort(key=lambda x: x[1]._creation_counter if hasattr(x[1], '_creation_counter') else (sys.maxint if callable(x[1]) else 0))
new_class = super(_ItemElementMeta, mcs).__new__(mcs, name, bases, attrs)
new_class._attrs = _attrs + [f[0] for f in filters]
return new_class
class ItemElement(AbstractElement):
__metaclass__ = _ItemElementMeta
_attrs = None
_loaders = None
klass = None
condition = None
validate = None
class Index(object):
pass
def __init__(self, *args, **kwargs):
super(ItemElement, self).__init__(*args, **kwargs)
self.logger = getLogger(self.__class__.__name__.lower())
self.obj = None
def build_object(self):
if self.klass is None:
return
return self.klass()
def __call__(self, obj=None):
if obj is not None:
self.obj = obj
for obj in self:
return obj
def __iter__(self):
if self.condition is not None and not self.condition():
return
try:
if self.obj is None:
self.obj = self.build_object()
self.parse(self.el)
self.handle_loaders()
for attr in self._attrs:
self.handle_attr(attr, getattr(self, 'obj_%s' % attr))
except SkipItem:
return
if self.validate is not None and not self.validate(self.obj):
return
yield self.obj
def handle_attr(self, key, func):
try:
value = self.use_selector(func, key=key)
except Exception as e:
# Help debugging as tracebacks do not give us the key
self.logger.warning('Attribute %s raises %s' % (key, repr(e)))
raise
logger = getLogger('b2filters')
logger.debug("%s.%s = %r" % (self._random_id, key, value))
setattr(self.obj, key, value)
class TableElement(ListElement):
head_xpath = None
cleaner = CleanText
def __init__(self, *args, **kwargs):
super(TableElement, self).__init__(*args, **kwargs)
self._cols = {}
columns = {}
for attrname in dir(self):
m = re.match('col_(.*)', attrname)
if m:
cols = getattr(self, attrname)
if not isinstance(cols, (list,tuple)):
cols = [cols]
columns[m.group(1)] = [s.lower() for s in cols]
for colnum, el in enumerate(self.el.xpath(self.head_xpath)):
title = self.cleaner.clean(el).lower()
for name, titles in columns.iteritems():
if title in titles:
self._cols[name] = colnum
def get_colnum(self, name):
return self._cols.get(name, None)

View file

@ -1,33 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from requests.exceptions import HTTPError
from weboob.tools.exceptions import BrowserHTTPError, BrowserHTTPNotFound
class HTTPNotFound(HTTPError, BrowserHTTPNotFound):
pass
class ClientError(HTTPError, BrowserHTTPError):
pass
class ServerError(HTTPError, BrowserHTTPError):
pass

View file

@ -1,18 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.

View file

@ -1,82 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import lxml.html as html
from .standard import _Selector, _NO_DEFAULT, Filter, FilterError
from weboob.tools.html import html2text
__all__ = ['CSS', 'XPath', 'XPathNotFound', 'AttributeNotFound',
'Attr', 'Link', 'CleanHTML']
class XPathNotFound(FilterError):
pass
class AttributeNotFound(FilterError):
pass
class CSS(_Selector):
@classmethod
def select(cls, selector, item):
return item.cssselect(selector)
class XPath(_Selector):
pass
class Attr(Filter):
def __init__(self, selector, attr, default=_NO_DEFAULT):
super(Attr, self).__init__(selector, default=default)
self.attr = attr
def filter(self, el):
try:
return u'%s' % el[0].attrib[self.attr]
except IndexError:
return self.default_or_raise(XPathNotFound('Unable to find link %s' % self.selector))
except KeyError:
return self.default_or_raise(AttributeNotFound('Link %s does not has attribute %s' % (el[0], self.attr)))
class Link(Attr):
"""
Get the link uri of an element.
If the <a> tag is not found, an exception IndexError is raised.
"""
def __init__(self, selector=None, default=_NO_DEFAULT):
super(Link, self).__init__(selector, 'href', default=default)
class CleanHTML(Filter):
def filter(self, txt):
if isinstance(txt, (tuple, list)):
return u' '.join([self.clean(item) for item in txt])
return self.clean(txt)
@classmethod
def clean(cls, txt):
if not isinstance(txt, basestring):
txt = html.tostring(txt, encoding=unicode)
return html2text(txt)

View file

@ -1,134 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Simon Murail
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
from ast import literal_eval
from weboob.tools.browser2.filters.standard import Filter, Regexp, RegexpError
from weboob.tools.exceptions import ParseError
__all__ = ['JSPayload', 'JSVar']
def _quoted(q):
return r'(?<!\\){0}(?:\\{0}|[^{0}])*{0}'.format(q)
class JSPayload(Filter):
r"""
Get Javascript code from tag's text, cleaned from all comments.
It filters code in a such a way that corner cases are handled, such as
comments in string literals and comments in comments.
The following snippet is borrowed from <http://ostermiller.org/findcomment.html>:
>>> JSPayload.filter('''someString = "An example comment: /* example */";
...
... // The comment around this code has been commented out.
... // /*
... some_code();
... // */''')
'someString = "An example comment: /* example */";\n\nsome_code();\n'
"""
_single_line_comment = '[ \t\v\f]*//.*\r?(?:\n|$)'
_multi_line_comment = '/\*(?:.|[\r\n])*?\*/'
_splitter = re.compile('(?:(%s|%s)|%s|%s)' % (_quoted('"'),
_quoted("'"),
_single_line_comment,
_multi_line_comment))
@classmethod
def filter(cls, value):
return ''.join(filter(bool, cls._splitter.split(value)))
class JSVar(Regexp):
r"""
Get the init value of first found assignment value of a variable.
It only understands literal values, but should parse them well. Values
are converted in python values, quotes and slashes in strings are stripped.
>>> JSVar(var='test').filter("var test = .1;\nsomecode()")
0.1
>>> JSVar(var='test').filter("test = 42;\nsomecode()")
42
>>> JSVar(var='test').filter("test = 'Some \\'string\\' value, isn\\'t it ?';\nsomecode()")
"Some 'string' value, isn't it ?"
>>> JSVar(var='test').filter('test = "Some \\"string\\" value";\nsomecode()')
'Some "string" value'
>>> JSVar(var='test').filter("var test = false;\nsomecode()")
False
>>> JSVar(var='test', nth=1).filter("var test = false; test = true;\nsomecode()")
True
"""
pattern_template = r"""(?x)
(?:var\s+)? # optional var keyword
\b%%s # var name
\s*=\s* # equal sign
(?:(?P<float>[-+]?\s* # float ?
(?:(?:\d+\.\d*|\d*\.\d+)(?:[eE]\d+)?
|\d+[eE]\d+))
|(?P<int>[-+]?\s*(?:0[bBxXoO])?\d+) # int ?
|(?:(?:new\s+String\()?(?P<str>(?:%s|%s))) # str ?
|(?P<bool>true|false) # bool ?
|(?P<None>null)) # None ?
""" % (_quoted('"'), _quoted("'"))
_re_spaces = re.compile(r'\s+')
def to_python(self, m):
"Convert MatchObject to python value"
values = m.groupdict()
for t, v in values.iteritems():
if v is not None:
break
if self.need_type and t != self.need_type:
raise ParseError('Variable %r with type %s not found' % (self.var, self.need_type))
if t in ('int', 'float'):
return literal_eval(v)
if t == 'str':
return literal_eval(v).decode('utf-8')
if t == 'bool':
return v == 'true'
if t == 'None':
return
if self.default:
return self.default
raise ParseError('Unable to parse variable %r value' % self.var)
def __init__(self, selector=None, var=None, need_type=None, **kwargs):
assert var is not None, 'Please specify a var parameter'
assert 'pattern' not in kwargs, "It would be meaningless to define a pattern, use Regexp"
assert 'template' not in kwargs, "Can't use a template, use Regexp if you have to"
self.var = var
self.need_type = need_type.__name__ if type(need_type) == type else need_type
pattern = self.pattern_template % re.escape(var)
super(JSVar, self).__init__(selector, pattern=pattern, template=self.to_python, **kwargs)
def filter(self, txt):
try:
return super(JSVar, self).filter(txt)
except RegexpError:
raise ParseError('Variable %r not found' % self.var)

View file

@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .standard import _Selector, _NO_DEFAULT
__all__ = ['Dict']
class _DictMeta(type):
def __getitem__(cls, name):
return cls(name)
class Dict(_Selector):
__metaclass__ = _DictMeta
def __init__(self, selector=None, default=_NO_DEFAULT):
super(Dict, self).__init__(self, default=default)
self.selector = selector.split('/') if selector is not None else []
def __getitem__(self, name):
self.selector.append(name)
return self
@classmethod
def select(cls, selector, item):
if isinstance(item, dict):
content = item
else:
content = item.el
for el in selector:
if el not in content:
return None
content = content.get(el)
return content

View file

@ -1,674 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
import datetime
import re
from decimal import Decimal, InvalidOperation
from itertools import islice
from dateutil.parser import parse as parse_date
from weboob.capabilities.base import empty
from weboob.tools.compat import basestring
from weboob.tools.exceptions import ParseError
from weboob.tools.browser2 import URL
from weboob.tools.log import getLogger
class NoDefault(object):
def __repr__(self):
return 'NO_DEFAULT'
_NO_DEFAULT = NoDefault()
__all__ = ['FilterError', 'ColumnNotFound', 'RegexpError', 'ItemNotFound',
'Filter', 'Base', 'Env', 'TableCell', 'RawText',
'CleanText', 'Lower', 'CleanDecimal', 'Field', 'Regexp', 'Map',
'DateTime', 'Date', 'Time', 'DateGuesser', 'Duration',
'MultiFilter', 'CombineDate', 'Format', 'Join', 'Type',
'BrowserURL', 'Async', 'AsyncLoad']
class FilterError(ParseError):
pass
class ColumnNotFound(FilterError):
pass
class RegexpError(FilterError):
pass
class ItemNotFound(FilterError):
pass
class _Filter(object):
_creation_counter = 0
def __init__(self, default=_NO_DEFAULT):
self._key = None
self._obj = None
self.default = default
self._creation_counter = _Filter._creation_counter
_Filter._creation_counter += 1
def __or__(self, o):
self.default = o
return self
def __and__(self, o):
if isinstance(o, type) and issubclass(o, _Filter):
o = o()
o.selector = self
return o
def default_or_raise(self, exception):
if self.default is not _NO_DEFAULT:
return self.default
else:
raise exception
def __str__(self):
return self.__class__.__name__
def debug(*args):
"""
A decorator function to provide some debugs informations
in Filters.
It prints by default the name of the Filter and the input value.
"""
def wraper(function):
def print_debug(self, value):
logger = getLogger('b2filters')
result = ''
outputvalue = value
if isinstance(value, list):
from lxml import etree
outputvalue = ''
first = True
for element in value:
if first:
first = False
else:
outputvalue += ', '
if isinstance(element, etree.ElementBase):
outputvalue += "%s" % etree.tostring(element, encoding=unicode)
else:
outputvalue += "%r" % element
if self._obj is not None:
result += "%s" % self._obj._random_id
if self._key is not None:
result += ".%s" % self._key
name = str(self)
result += " %s(%r" % (name, outputvalue)
for arg in self.__dict__:
if arg.startswith('_') or arg == u"selector":
continue
if arg == u'default' and getattr(self, arg) == _NO_DEFAULT:
continue
result += ", %s=%r" % (arg, getattr(self, arg))
result += u')'
logger.debug(result)
res = function(self, value)
return res
return print_debug
return wraper
class Filter(_Filter):
"""
Class used to filter on a HTML element given as call parameter to return
matching elements.
Filters can be chained, so the parameter supplied to constructor can be
either a xpath selector string, or an other filter called before.
>>> from lxml.html import etree
>>> f = CleanDecimal(CleanText('//p'), replace_dots=True)
>>> f(etree.fromstring('<html><body><p>blah: <span>229,90</span></p></body></html>'))
Decimal('229.90')
"""
def __init__(self, selector=None, default=_NO_DEFAULT):
super(Filter, self).__init__(default=default)
self.selector = selector
@classmethod
def select(cls, selector, item, obj=None, key=None):
if isinstance(selector, basestring):
return item.xpath(selector)
elif isinstance(selector, _Filter):
selector._key = key
selector._obj = obj
return selector(item)
elif callable(selector):
return selector(item)
else:
return selector
def __call__(self, item):
return self.filter(self.select(self.selector, item, key=self._key, obj=self._obj))
@debug()
def filter(self, value):
"""
This method have to be overrided by children classes.
"""
raise NotImplementedError()
class _Selector(Filter):
def filter(self, elements):
if elements is not None:
return elements
else:
return self.default_or_raise(ParseError('Element %r not found' % self.selector))
class AsyncLoad(Filter):
def __call__(self, item):
link = self.select(self.selector, item, key=self._key, obj=self._obj)
return item.page.browser.async_open(link)
class Async(_Filter):
def __init__(self, name, selector=None):
super(Async, self).__init__()
self.selector = selector
self.name = name
def __and__(self, o):
if isinstance(o, type) and issubclass(o, _Filter):
o = o()
self.selector = o
return self
def __call__(self, item):
result = item.loaders[self.name].result()
assert result.page is not None, 'The loaded url %s hasn\'t been matched by an URL object' % result.url
return self.selector(result.page.doc)
class Base(Filter):
"""
Change the base element used in filters.
>>> Base(Env('header'), CleanText('./h1')) # doctest: +SKIP
"""
def __call__(self, item):
base = self.select(self.base, item, obj=self._obj, key=self._key)
return self.selector(base)
def __init__(self, base, selector=None, default=_NO_DEFAULT):
super(Base, self).__init__(selector, default)
self.base = base
class Env(_Filter):
"""
Filter to get environment value of the item.
It is used for example to get page parameters, or when there is a parse()
method on ItemElement.
"""
def __init__(self, name, default=_NO_DEFAULT):
super(Env, self).__init__(default)
self.name = name
def __call__(self, item):
try:
return item.env[self.name]
except KeyError:
return self.default_or_raise(ParseError('Environment variable %s not found' % self.name))
class TableCell(_Filter):
"""
Used with TableElement, it get the cell value from its name.
For example:
>>> from weboob.capabilities.bank import Transaction
>>> from weboob.tools.browser2.elements import TableElement, ItemElement
>>> class table(TableElement):
... head_xpath = '//table/thead/th'
... item_xpath = '//table/tbody/tr'
... col_date = u'Date'
... col_label = [u'Name', u'Label']
... class item(ItemElement):
... klass = Transaction
... obj_date = Date(TableCell('date'))
... obj_label = CleanText(TableCell('label'))
...
"""
def __init__(self, *names, **kwargs):
super(TableCell, self).__init__(**kwargs)
self.names = names
def __call__(self, item):
for name in self.names:
idx = item.parent.get_colnum(name)
if idx is not None:
return item.xpath('./td[%s]' % (idx + 1))
return self.default_or_raise(ColumnNotFound('Unable to find column %s' % ' or '.join(self.names)))
class RawText(Filter):
@debug()
def filter(self, el):
if isinstance(el, (tuple, list)):
return u' '.join([self.filter(e) for e in el])
if el.text is None:
return self.default
else:
return unicode(el.text)
class CleanText(Filter):
"""
Get a cleaned text from an element.
It first replaces all tabs and multiple spaces
(including newlines if ``newlines`` is True)
to one space and strips the result string.
Then it replaces all symbols given in the ``symbols`` argument.
>>> CleanText().filter('coucou ')
u'coucou'
>>> CleanText().filter(u'coucou\xa0coucou')
u'coucou coucou'
>>> CleanText(newlines=True).filter(u'coucou\\r\\n coucou ')
u'coucou coucou'
>>> CleanText(newlines=False).filter(u'coucou\\r\\n coucou ')
u'coucou\\ncoucou'
"""
def __init__(self, selector=None, symbols='', replace=[], childs=True, newlines=True, **kwargs):
super(CleanText, self).__init__(selector, **kwargs)
self.symbols = symbols
self.toreplace = replace
self.childs = childs
self.newlines = newlines
@debug()
def filter(self, txt):
if isinstance(txt, (tuple, list)):
txt = u' '.join([self.clean(item, childs=self.childs) for item in txt])
txt = self.clean(txt, childs=self.childs, newlines=self.newlines)
txt = self.remove(txt, self.symbols)
txt = self.replace(txt, self.toreplace)
# lxml under Python 2 returns str instead of unicode if it is pure ASCII
return unicode(txt)
@classmethod
def clean(cls, txt, childs=True, newlines=True):
if not isinstance(txt, basestring):
if childs:
txt = [t.strip() for t in txt.itertext()]
else:
txt = [txt.text.strip()]
txt = u' '.join(txt) # 'foo bar'
if newlines:
txt = re.compile(u'\s+', flags=re.UNICODE).sub(u' ', txt) # 'foo bar'
else:
# normalize newlines and clean what is inside
txt = '\n'.join([cls.clean(l) for l in txt.splitlines()])
return txt.strip()
@classmethod
def remove(cls, txt, symbols):
for symbol in symbols:
txt = txt.replace(symbol, '')
return txt.strip()
@classmethod
def replace(cls, txt, replace):
for before, after in replace:
txt = txt.replace(before, after)
return txt
class Lower(CleanText):
@debug()
def filter(self, txt):
txt = super(Lower, self).filter(txt)
return txt.lower()
class CleanDecimal(CleanText):
"""
Get a cleaned Decimal value from an element.
replace_dots is False by default. A dot is interpreted as a decimal separator.
If replace_dots is set to True, we remove all the dots. The ',' is used as decimal
separator (often useful for French values)
If replace_dots is a tuple, the first element will be used as the thousands separator,
and the second as the decimal separator.
See http://en.wikipedia.org/wiki/Thousands_separator#Examples_of_use
For example, for the UK style (as in 1,234,567.89):
>>> CleanDecimal('./td[1]', replace_dots=(',', '.')) # doctest: +SKIP
"""
def __init__(self, selector=None, replace_dots=False, sign=None, default=_NO_DEFAULT):
super(CleanDecimal, self).__init__(selector, default=default)
self.replace_dots = replace_dots
self.sign = sign
@debug()
def filter(self, text):
if empty(text):
return self.default_or_raise(ParseError('Unable to parse %r' % text))
original_text = text = super(CleanDecimal, self).filter(text)
if self.replace_dots:
if type(self.replace_dots) is tuple:
thousands_sep, decimal_sep = self.replace_dots
else:
thousands_sep, decimal_sep = '.', ','
text = text.replace(thousands_sep, '').replace(decimal_sep, '.')
try:
v = Decimal(re.sub(r'[^\d\-\.]', '', text))
if self.sign:
v *= self.sign(original_text)
return v
except InvalidOperation as e:
return self.default_or_raise(e)
class Type(Filter):
"""
Get a cleaned value of any type from an element text.
The type_func can be any callable (class, function, etc.).
By default an empty string will not be parsed but it can be changed
by specifying minlen=False. Otherwise, a minimal length can be specified.
>>> Type(CleanText('./td[1]'), type=int) # doctest: +SKIP
>>> Type(type=int).filter('42')
42
>>> Type(type=int, default='NaN').filter('')
'NaN'
>>> Type(type=str, minlen=False, default='a').filter('')
''
>>> Type(type=str, minlen=0, default='a').filter('')
'a'
"""
def __init__(self, selector=None, type=None, minlen=0, default=_NO_DEFAULT):
super(Type, self).__init__(selector, default=default)
self.type_func = type
self.minlen = minlen
@debug()
def filter(self, txt):
if empty(txt):
return self.default_or_raise(ParseError('Unable to parse %r' % txt))
if self.minlen is not False and len(txt) <= self.minlen:
return self.default_or_raise(ParseError('Unable to parse %r' % txt))
try:
return self.type_func(txt)
except ValueError as e:
return self.default_or_raise(ParseError('Unable to parse %r: %s' % (txt, e)))
class Field(_Filter):
"""
Get the attribute of object.
"""
def __init__(self, name):
super(Field, self).__init__()
self.name = name
def __call__(self, item):
return item.use_selector(getattr(item, 'obj_%s' % self.name), key=self._key)
# Based on nth from https://docs.python.org/2/library/itertools.html
def nth(iterable, n, default=None):
"Returns the nth item or a default value, n can be negative"
if n < 0:
iterable = reversed(list(iterable))
n = abs(n) - 1
return next(islice(iterable, n, None), default)
def ordinal(n):
"To have some readable debug information: 0 => 1st, 1 => 2nd..."
i = abs(n)
n = n - 1 if n < 0 else n + 1
return str(n) + ('th' if i > 2 else ['st', 'nd', 'rd'][i])
class Regexp(Filter):
r"""
Apply a regex.
>>> from lxml.html import etree
>>> doc = etree.fromstring('<html><body><p>Date: <span>13/08/1988</span></p></body></html>')
>>> Regexp(CleanText('//p'), r'Date: (\d+)/(\d+)/(\d+)', '\\3-\\2-\\1')(doc)
u'1988-08-13'
>>> (Regexp(CleanText('//body'), r'(\d+)', nth=1))(doc)
u'08'
>>> (Regexp(CleanText('//body'), r'(\d+)', nth=-1))(doc)
u'1988'
"""
def __init__(self, selector=None, pattern=None, template=None, nth=0, flags=0, default=_NO_DEFAULT):
super(Regexp, self).__init__(selector, default=default)
assert pattern is not None
self.pattern = pattern
self._regex = re.compile(pattern, flags)
self.template = template
self.nth = nth
@debug()
def filter(self, txt):
if isinstance(txt, (tuple, list)):
txt = u' '.join([t.strip() for t in txt.itertext()])
mobj = self._regex.search(txt) if self.nth == 0 else \
nth(self._regex.finditer(txt), self.nth)
if not mobj:
msg = 'Unable to match %s %s in %r' % (ordinal(self.nth), self.pattern, txt)
return self.default_or_raise(RegexpError(msg))
if self.template is None:
return next(g for g in mobj.groups() if g is not None)
else:
return self.template(mobj) if callable(self.template) else mobj.expand(self.template)
class Map(Filter):
def __init__(self, selector, map_dict, default=_NO_DEFAULT):
super(Map, self).__init__(selector, default=default)
self.map_dict = map_dict
@debug()
def filter(self, txt):
try:
return self.map_dict[txt]
except KeyError:
return self.default_or_raise(ItemNotFound('Unable to handle %r on %r' % (txt, self.map_dict)))
class DateTime(Filter):
def __init__(self, selector=None, default=_NO_DEFAULT, dayfirst=False, translations=None):
super(DateTime, self).__init__(selector, default=default)
self.dayfirst = dayfirst
self.translations = translations
@debug()
def filter(self, txt):
if empty(txt) or txt == '':
return self.default_or_raise(ParseError('Unable to parse %r' % txt))
try:
if self.translations:
for search, repl in self.translations:
txt = search.sub(repl, txt)
return parse_date(txt, dayfirst=self.dayfirst)
except ValueError as e:
return self.default_or_raise(ParseError('Unable to parse %r: %s' % (txt, e)))
class Date(DateTime):
def __init__(self, selector=None, default=_NO_DEFAULT, dayfirst=False, translations=None):
super(Date, self).__init__(selector, default=default, dayfirst=dayfirst, translations=translations)
@debug()
def filter(self, txt):
datetime = super(Date, self).filter(txt)
if hasattr(datetime, 'date'):
return datetime.date()
else:
return datetime
class DateGuesser(Filter):
def __init__(self, selector, date_guesser, **kwargs):
super(DateGuesser, self).__init__(selector)
self.date_guesser = date_guesser
self.kwargs = kwargs
def __call__(self, item):
values = self.select(self.selector, item, obj=self._obj, key=self._key)
date_guesser = self.date_guesser
# In case Env() is used to kive date_guesser.
if isinstance(date_guesser, _Filter):
date_guesser = self.select(date_guesser, item, obj=self._obj, key=self._key)
if isinstance(values, basestring):
values = re.split('[/-]', values)
if len(values) == 2:
day, month = map(int, values)
else:
raise ParseError('Unable to take (day, month) tuple from %r' % values)
return date_guesser.guess_date(day, month, **self.kwargs)
class Time(Filter):
klass = datetime.time
_regexp = re.compile(r'(?P<hh>\d+):?(?P<mm>\d+)(:(?P<ss>\d+))?')
kwargs = {'hour': 'hh', 'minute': 'mm', 'second': 'ss'}
def __init__(self, selector=None, default=_NO_DEFAULT):
super(Time, self).__init__(selector, default=default)
@debug()
def filter(self, txt):
m = self._regexp.search(txt)
if m:
kwargs = {}
for key, index in self.kwargs.iteritems():
kwargs[key] = int(m.groupdict()[index] or 0)
return self.klass(**kwargs)
return self.default_or_raise(ParseError('Unable to find time in %r' % txt))
class Duration(Time):
klass = datetime.timedelta
regexp = re.compile(r'((?P<hh>\d+)[:;])?(?P<mm>\d+)[;:](?P<ss>\d+)')
kwargs = {'hours': 'hh', 'minutes': 'mm', 'seconds': 'ss'}
class MultiFilter(Filter):
def __init__(self, *args, **kwargs):
default = kwargs.pop('default', _NO_DEFAULT)
super(MultiFilter, self).__init__(args, default)
def __call__(self, item):
values = [self.select(selector, item, obj=self._obj, key=self._key) for selector in self.selector]
return self.filter(tuple(values))
def filter(self, values):
raise NotImplementedError()
class CombineDate(MultiFilter):
def __init__(self, date, time):
super(CombineDate, self).__init__(date, time)
@debug()
def filter(self, values):
return datetime.datetime.combine(values[0], values[1])
class Format(MultiFilter):
def __init__(self, fmt, *args):
super(Format, self).__init__(*args)
self.fmt = fmt
@debug()
def filter(self, values):
return self.fmt % values
class BrowserURL(MultiFilter):
def __init__(self, url_name, **kwargs):
super(BrowserURL, self).__init__(*kwargs.values())
self.url_name = url_name
self.keys = kwargs.keys()
def __call__(self, item):
values = super(BrowserURL, self).__call__(item)
url = getattr(item.page.browser, self.url_name)
assert isinstance(url, URL), "%s.%s must be an URL object" % (type(item.page.browser).__name__, self.url_name)
return url.build(**dict(zip(self.keys, values)))
@debug()
def filter(self, values):
return values
class Join(Filter):
def __init__(self, pattern, selector=None, textCleaner=CleanText):
super(Join, self).__init__(selector)
self.pattern = pattern
self.textCleaner = textCleaner
@debug()
def filter(self, el):
res = u''
for li in el:
res += self.pattern % self.textCleaner.clean(li)
return res
def test():
# This test works poorly under a doctest, or would be hard to read
assert CleanText().filter(u' coucou  \n\théhé') == u'coucou héhé'
assert CleanText().filter('coucou\xa0coucou') == CleanText().filter(u'coucou\xa0coucou') == u'coucou coucou'

View file

@ -1,716 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import absolute_import
try:
from urllib.parse import unquote
except ImportError:
from urllib import unquote
import re
import warnings
from copy import deepcopy
from io import BytesIO
import requests
from weboob.tools.ordereddict import OrderedDict
from weboob.tools.regex_helper import normalize
from weboob.tools.compat import basestring
from weboob.tools.log import getLogger
from .browser import DomainBrowser
class UrlNotResolvable(Exception):
"""
Raised when trying to locate on an URL instance which url pattern is not resolvable as a real url.
"""
class URL(object):
"""
A description of an URL on the PagesBrowser website.
It takes one or several regexps to match urls, and an optional Page
class which is instancied by PagesBrowser.open if the page matches a regex.
"""
_creation_counter = 0
def __init__(self, *args):
self.urls = []
self.klass = None
self.browser = None
for arg in args:
if isinstance(arg, basestring):
self.urls.append(arg)
if isinstance(arg, type):
self.klass = arg
self._creation_counter = URL._creation_counter
URL._creation_counter += 1
def is_here(self, **kwargs):
"""
Returns True if the current page of browser matches this URL.
If arguments are provided, and only then, they are checked against the arguments
that were used to build the current page URL.
"""
assert self.klass is not None, "You can use this method only if there is a Page class handler."
if len(kwargs):
params = self.match(self.build(**kwargs)).groupdict()
else:
params = None
# XXX use unquote on current params values because if there are spaces
# or special characters in them, it is encoded only in but not in kwargs.
return self.browser.page and isinstance(self.browser.page, self.klass) \
and (params is None or params == dict([(k,unquote(v)) for k,v in self.browser.page.params.iteritems()]))
def stay_or_go(self, **kwargs):
"""
Request to go on this url only if we aren't already here.
Arguments are optional parameters for url.
>>> url = URL('http://exawple.org/(?P<pagename>).html')
>>> url.stay_or_go(pagename='index')
"""
if self.is_here(**kwargs):
return self.browser.page
return self.go(**kwargs)
def go(self, params=None, data=None, **kwargs):
"""
Request to go on this url.
Arguments are optional parameters for url.
>>> url = URL('http://exawple.org/(?P<pagename>).html')
>>> url.stay_or_go(pagename='index')
"""
r = self.browser.location(self.build(**kwargs), params=params, data=data)
return r.page or r
def open(self, params=None, data=None, **kwargs):
"""
Request to open on this url.
Arguments are optional parameters for url.
:param data: POST data
:type url: str or dict or None
>>> url = URL('http://exawple.org/(?P<pagename>).html')
>>> url.open(pagename='index')
"""
r = self.browser.open(self.build(**kwargs), params=params, data=data)
return r.page or r
def build(self, **kwargs):
"""
Build an url with the given arguments from URL's regexps.
:param param: Query string parameters
:rtype: :class:`str`
:raises: :class:`UrlNotResolvable` if unable to resolve a correct url with the given arguments.
"""
browser = kwargs.pop('browser', self.browser)
params = kwargs.pop('params', None)
patterns = []
for url in self.urls:
patterns += normalize(url)
for pattern, _ in patterns:
url = pattern
# only use full-name substitutions, to allow % in URLs
for kwkey in kwargs.keys(): # need to use keys() because of pop()
search = '%%(%s)s' % kwkey
if search in pattern:
url = url.replace(search, unicode(kwargs.pop(kwkey)))
# if there are named substitutions left, ignore pattern
if re.search('%\([A-z_]+\)s', url):
continue
# if not all kwargs were used
if len(kwargs):
continue
url = browser.absurl(url, base=True)
if params:
p = requests.models.PreparedRequest()
p.prepare_url(url, params)
url = p.url
return url
raise UrlNotResolvable('Unable to resolve URL with %r. Available are %s' % (kwargs, ', '.join([pattern for pattern, _ in patterns])))
def match(self, url, base=None):
"""
Check if the given url match this object.
"""
if base is None:
assert self.browser is not None
base = self.browser.BASEURL
for regex in self.urls:
if not re.match(r'^\w+://.*', regex):
regex = re.escape(base).rstrip('/') + '/' + regex.lstrip('/')
m = re.match(regex, url)
if m:
return m
def handle(self, response):
"""
Handle a HTTP response to get an instance of the klass if it matches.
"""
if self.klass is None:
return
m = self.match(response.url)
if m:
page = self.klass(self.browser, response, m.groupdict())
if hasattr(page, 'is_here'):
if callable(page.is_here):
if page.is_here():
return page
else:
assert isinstance(page.is_here, basestring)
if page.doc.xpath(page.is_here):
return page
else:
return page
def id2url(self, func):
r"""
Helper decorator to get an URL if the given first parameter is an ID.
"""
def inner(browser, id_or_url, *args, **kwargs):
if re.match('^https?://.*', id_or_url):
if not self.match(id_or_url, browser.BASEURL):
return
else:
id_or_url = self.build(id=id_or_url, browser=browser)
return func(browser, id_or_url, *args, **kwargs)
return inner
class _PagesBrowserMeta(type):
"""
Private meta-class used to keep order of URLs instances of PagesBrowser.
"""
def __new__(mcs, name, bases, attrs):
urls = [(url_name, attrs.pop(url_name)) for url_name, obj in attrs.items() if isinstance(obj, URL)]
urls.sort(key=lambda x: x[1]._creation_counter)
new_class = super(_PagesBrowserMeta, mcs).__new__(mcs, name, bases, attrs)
if new_class._urls is None:
new_class._urls = OrderedDict()
else:
new_class._urls = deepcopy(new_class._urls)
new_class._urls.update(urls)
return new_class
class PagesBrowser(DomainBrowser):
r"""
A browser which works pages and keep state of navigation.
To use it, you have to derive it and to create URL objects as class
attributes. When open() or location() are called, if the url matches
one of URL objects, it returns a Page object. In case of location(), it
stores it in self.page.
Example:
>>> class HomePage(Page):
... pass
...
>>> class ListPage(Page):
... pass
...
>>> class MyBrowser(PagesBrowser):
... BASEURL = 'http://example.org'
... home = URL('/(index\.html)?', HomePage)
... list = URL('/list\.html', ListPage)
...
You can then use URL instances to go on pages.
"""
_urls = None
__metaclass__ = _PagesBrowserMeta
def __getattr__(self, name):
if self._urls is not None and name in self._urls:
return self._urls[name]
else:
raise AttributeError("'%s' object has no attribute '%s'" % (
self.__class__.__name__, name))
def __init__(self, *args, **kwargs):
super(PagesBrowser, self).__init__(*args, **kwargs)
self.page = None
self._urls = deepcopy(self._urls)
for url in self._urls.itervalues():
url.browser = self
def open(self, *args, **kwargs):
"""
Same method than
:meth:`weboob.tools.browser2.browser.DomainBrowser.open`, but the
response contains an attribute `page` if the url matches any
:class:`URL` object.
"""
callback = kwargs.pop('callback', lambda response: response)
# Have to define a callback to seamlessly process synchronous and
# asynchronous requests, see :meth:`Browser.open` and its `async`
# and `callback` params.
def internal_callback(response):
# Try to handle the response page with an URL instance.
response.page = None
for url in self._urls.itervalues():
page = url.handle(response)
if page is not None:
self.logger.debug('Handle %s with %s' % (response.url, page.__class__.__name__))
response.page = page
break
if response.page is None:
self.logger.debug('Unable to handle %s' % response.url)
return callback(response)
return super(PagesBrowser, self).open(callback=internal_callback, *args, **kwargs)
def location(self, *args, **kwargs):
"""
Same method than
:meth:`weboob.tools.browser2.browser.Browser.location`, but if the
url matches any :class:`URL` object, an attribute `page` is added to
response, and the attribute :attr:`PagesBrowser.page` is set.
"""
if self.page is not None:
# Call leave hook.
self.page.on_leave()
response = self.open(*args, **kwargs)
self.response = response
self.page = response.page
self.url = response.url
if self.page is not None:
# Call load hook.
self.page.on_load()
# Returns self.response in case on_load recalls location()
return self.response
def pagination(self, func, *args, **kwargs):
r"""
This helper function can be used to handle pagination pages easily.
When the called function raises an exception :class:`NextPage`, it goes
on the wanted page and recall the function.
:class:`NextPage` constructor can take an url or a Request object.
>>> class Page(HTMLPage):
... def iter_values(self):
... for el in self.doc.xpath('//li'):
... yield el.text
... for next in self.doc.xpath('//a'):
... raise NextPage(next.attrib['href'])
...
>>> class Browser(PagesBrowser):
... BASEURL = 'http://people.symlink.me'
... list = URL('/~rom1/projects/weboob/list-(?P<pagenum>\d+).html', Page)
...
>>> b = Browser()
>>> b.list.go(pagenum=1)
>>> list(b.pagination(lambda: b.page.iter_values()))
['One', 'Two', 'Three', 'Four']
"""
while True:
try:
for r in func(*args, **kwargs):
yield r
except NextPage as e:
self.location(e.request)
else:
return
def pagination(func):
r"""
This helper decorator can be used to handle pagination pages easily.
When the called function raises an exception :class:`NextPage`, it goes on
the wanted page and recall the function.
:class:`NextPage` constructor can take an url or a Request object.
>>> class Page(HTMLPage):
... @pagination
... def iter_values(self):
... for el in self.doc.xpath('//li'):
... yield el.text
... for next in self.doc.xpath('//a'):
... raise NextPage(next.attrib['href'])
...
>>> class Browser(PagesBrowser):
... BASEURL = 'http://people.symlink.me'
... list = URL('/~rom1/projects/weboob/list-(?P<pagenum>\d+).html', Page)
...
>>> b = Browser()
>>> b.list.go(pagenum=1)
>>> list(b.page.iter_values())
['One', 'Two', 'Three', 'Four']
"""
def inner(page, *args, **kwargs):
while True:
try:
for r in func(page, *args, **kwargs):
yield r
except NextPage as e:
result = page.browser.location(e.request)
page = result.page
else:
return
return inner
class NextPage(Exception):
"""
Exception used for example in a Page to tell PagesBrowser.pagination to
go on the next page.
See :meth:`PagesBrowser.pagination` or decorator :func:`pagination`.
"""
def __init__(self, request):
super(NextPage, self).__init__()
self.request = request
def need_login(func):
"""
Decorator used to require to be logged to access to this function.
"""
def inner(browser, *args, **kwargs):
if browser.page is None or not browser.page.logged:
browser.do_login()
return func(browser, *args, **kwargs)
return inner
class LoginBrowser(PagesBrowser):
"""
A browser which supports login.
"""
def __init__(self, username, password, *args, **kwargs):
super(LoginBrowser, self).__init__(*args, **kwargs)
self.username = username
self.password = password
def do_login(self):
"""
Abstract method to implement to login on website.
It is call when a login is needed.
"""
raise NotImplementedError()
class Page(object):
"""
Base page.
"""
logged = False
def __init__(self, browser, response, params=None):
self.browser = browser
self.logger = getLogger(self.__class__.__name__.lower(), browser.logger)
self.response = response
self.url = self.response.url
self.params = params
def on_load(self):
"""
Event called when browser loads this page.
"""
def on_leave(self):
"""
Event called when browser leaves this page.
"""
class FormNotFound(Exception):
"""
Raised when :meth:`HTMLPage.get_form` can't find a form.
"""
class FormSubmitWarning(UserWarning):
"""
A form has more than one submit element selected, and will likely
generate an invalid request.
"""
class Form(OrderedDict):
"""
Represents a form of an HTML page.
It is used as a dict with pre-filled values from HTML. You can set new
values as strings by setting an item value.
submit_el allows you to only consider one submit button (which is what
browsers do). If set to None, it takes all of them, and if set to False,
it takes none.
"""
def __init__(self, page, el, submit_el=None):
super(Form, self).__init__()
self.page = page
self.el = el
self.submit_el = submit_el
self.method = el.attrib.get('method', 'GET')
self.url = el.attrib.get('action', page.url)
self.name = el.attrib.get('name', '')
submits = 0
for inp in el.xpath('.//input | .//select | .//textarea'):
try:
name = inp.attrib['name']
except KeyError:
continue
try:
if inp.attrib['type'] in ('checkbox', 'radio') and 'checked' not in inp.attrib:
continue
except KeyError:
pass
try:
if inp.attrib['type'] == 'submit':
if self.submit_el is not None and inp is not self.submit_el:
continue
else:
submits += 1
except KeyError:
pass
if inp.tag == 'select':
options = inp.xpath('.//option[@selected]')
if len(options) == 0:
options = inp.xpath('.//option')
if len(options) == 0:
value = u''
else:
value = options[0].attrib.get('value', options[0].text or u'')
else:
value = inp.attrib.get('value', inp.text or u'')
self[name] = value
if submits > 1:
warnings.warn('Form has more than one submit input, you should chose the correct one', FormSubmitWarning, stacklevel=3)
if self.submit_el is not None and self.submit_el is not False and submits == 0:
warnings.warn('Form had a submit element provided, but it was not found', FormSubmitWarning, stacklevel=3)
@property
def request(self):
"""
Get the Request object from the form.
"""
if self.method.lower() == 'get':
req = requests.Request(self.method, self.url, params=self)
else:
req = requests.Request(self.method, self.url, data=self)
req.headers.setdefault('Referer', self.page.url)
return req
def submit(self, **kwargs):
"""
Submit the form and tell browser to be located to the new page.
"""
kwargs.setdefault('data_encoding', self.page.encoding)
return self.page.browser.location(self.request, **kwargs)
class CsvPage(Page):
DIALECT = 'excel'
FMTPARAMS = {}
ENCODING = 'utf-8'
NEWLINES_HACK = True
"""
If True, will consider the first line as a header.
This means the rows will be also available as dictionnaries.
"""
HEADER = None
def __init__(self, browser, response, *args, **kwargs):
super(CsvPage, self).__init__(browser, response, *args, **kwargs)
content = response.content
encoding = self.ENCODING
if encoding == 'utf-16le':
content = content.decode('utf-16le')[1:].encode('utf-8')
encoding = 'utf-8'
if self.NEWLINES_HACK:
content = content.replace('\r\n', '\n').replace('\r', '\n')
fp = BytesIO(content)
self.doc = self.parse(fp, encoding)
def parse(self, data, encoding=None):
import csv
reader = csv.reader(data, dialect=self.DIALECT, **self.FMTPARAMS)
header = None
drows = []
rows = []
for i, row in enumerate(reader):
if self.HEADER and i+1 < self.HEADER:
continue
row = self.decode_row(row, encoding)
if header is None and self.HEADER:
header = row
else:
rows.append(row)
if header:
drow = {}
for i, cell in enumerate(row):
drow[header[i]] = cell
drows.append(drow)
return drows if header is not None else row
def decode_row(self, row, encoding):
if encoding:
return [unicode(cell, encoding) for cell in row]
else:
return row
class JsonPage(Page):
def __init__(self, browser, response, *args, **kwargs):
super(JsonPage, self).__init__(browser, response, *args, **kwargs)
from weboob.tools.json import json
self.doc = json.loads(response.text)
class XMLPage(Page):
ENCODING = None
"""
Force a page encoding.
It is recommended to use None for autodetection.
"""
def __init__(self, browser, response, *args, **kwargs):
super(XMLPage, self).__init__(browser, response, *args, **kwargs)
import lxml.etree as etree
parser = etree.XMLParser(encoding=self.ENCODING or response.encoding)
self.doc = etree.parse(BytesIO(response.content), parser)
class RawPage(Page):
def __init__(self, browser, response, *args, **kwargs):
super(RawPage, self).__init__(browser, response, *args, **kwargs)
self.doc = response.content
class HTMLPage(Page):
"""
HTML page.
"""
FORM_CLASS = Form
ENCODING = None
"""
Force a page encoding.
It is recommended to use None for autodetection.
"""
def __init__(self, browser, response, *args, **kwargs):
super(HTMLPage, self).__init__(browser, response, *args, **kwargs)
self.encoding = self.ENCODING or response.encoding
import lxml.html as html
parser = html.HTMLParser(encoding=self.encoding)
self.doc = html.parse(BytesIO(response.content), parser)
def get_form(self, xpath='//form', name=None, nr=None, submit=None):
"""
Get a :class:`Form` object from a selector.
The form will be analyzed and its parameters extracted.
In the case there is more than one "submit" input, only one of
them should be chosen to generate the request.
:param xpath: xpath string to select forms
:type xpath: :class:`str`
:param name: if supplied, select a form with the given name
:type name: :class:`str`
:param nr: if supplied, take the n+1 th selected form
:type nr: :class:`int`
:param submit: if supplied, xpath string to select the submit \
element from the form
:type submit: :class:`str`
:rtype: :class:`Form`
:raises: :class:`FormNotFound` if no form is found
"""
i = 0
for el in self.doc.xpath(xpath):
if name is not None and el.attrib.get('name', '') != name:
continue
if nr is not None and i != nr:
i += 1
continue
if isinstance(submit, basestring):
submit_el = el.xpath(submit)[0]
else:
submit_el = submit
return self.FORM_CLASS(self, el, submit_el)
raise FormNotFound()
def method(klass):
"""
Class-decorator to call it as a method.
"""
def inner(self, *args, **kwargs):
return klass(self)(*args, **kwargs)
return inner
class LoggedPage(object):
"""
A page that only logged users can reach. If we did not get a redirection
for this page, we are sure that the login is still active.
Do not use this class for page we mixed content (logged/anonymous) or for
pages with a login form.
"""
logged = True

View file

@ -1,151 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright(C) 2014 Simon Murail
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
# Inspired by: https://github.com/ross/requests-futures/blob/master/requests_futures/sessions.py
# XXX Licence issues ?
try:
from concurrent.futures import ThreadPoolExecutor
except ImportError:
ThreadPoolExecutor = None
from requests import Session
from requests.adapters import DEFAULT_POOLSIZE, HTTPAdapter
from requests.compat import cookielib, OrderedDict
from requests.cookies import cookiejar_from_dict, RequestsCookieJar
from requests.models import PreparedRequest
from requests.sessions import merge_setting
from requests.structures import CaseInsensitiveDict
from requests.utils import get_netrc_auth
def merge_hooks(request_hooks, session_hooks, dict_class=OrderedDict):
"""
Properly merges both requests and session hooks.
This is necessary because when request_hooks == {'response': []}, the
merge breaks Session hooks entirely.
Backport from request so we can use it in wheezy
"""
if session_hooks is None or session_hooks.get('response') == []:
return request_hooks
if request_hooks is None or request_hooks.get('response') == []:
return session_hooks
ret = {}
for (k, v) in request_hooks.items():
if v is not None:
ret[k] = set(v).union(session_hooks.get(k, []))
return ret
class WeboobSession(Session):
def prepare_request(self, request):
"""Constructs a :class:`PreparedRequest <PreparedRequest>` for
transmission and returns it. The :class:`PreparedRequest` has settings
merged from the :class:`Request <Request>` instance and those of the
:class:`Session`.
:param request: :class:`Request` instance to prepare with this
session's settings.
"""
cookies = request.cookies or {}
# Bootstrap CookieJar.
if not isinstance(cookies, cookielib.CookieJar):
cookies = cookiejar_from_dict(cookies)
# Merge with session cookies
merged_cookies = RequestsCookieJar()
merged_cookies.update(self.cookies)
merged_cookies.update(cookies)
# Set environment's basic authentication if not explicitly set.
auth = request.auth
if self.trust_env and not auth and not self.auth:
auth = get_netrc_auth(request.url)
p = PreparedRequest()
p.prepare(
method=request.method.upper(),
url=request.url,
files=request.files,
data=request.data,
headers=merge_setting(request.headers, self.headers, dict_class=CaseInsensitiveDict),
params=merge_setting(request.params, self.params),
auth=merge_setting(auth, self.auth),
cookies=merged_cookies,
hooks=merge_hooks(request.hooks, self.hooks),
)
return p
class FuturesSession(WeboobSession):
def __init__(self, executor=None, max_workers=2, *args, **kwargs):
"""Creates a FuturesSession
Notes
~~~~~
* ProcessPoolExecutor is not supported b/c Response objects are
not picklable.
* If you provide both `executor` and `max_workers`, the latter is
ignored and provided executor is used as is.
"""
super(FuturesSession, self).__init__(*args, **kwargs)
if executor is None and ThreadPoolExecutor is not None:
executor = ThreadPoolExecutor(max_workers=max_workers)
# set connection pool size equal to max_workers if needed
if max_workers > DEFAULT_POOLSIZE:
adapter_kwargs = dict(pool_connections=max_workers,
pool_maxsize=max_workers)
self.mount('https://', HTTPAdapter(**adapter_kwargs))
self.mount('http://', HTTPAdapter(**adapter_kwargs))
self.executor = executor
def send(self, *args, **kwargs):
"""Maintains the existing api for :meth:`Session.send`
Used by :meth:`request` and thus all of the higher level methods
If background_callback param is defined, request is processed in a
thread, calling background_callback and returning it's result when
request has been processed. If background_callback is not defined,
request is processed as usual, in a blocking way.
"""
sup = super(FuturesSession, self).send
background_callback = kwargs.pop('background_callback', None)
if background_callback:
if not self.executor:
raise ImportError('Please install python-concurrent.futures')
def func(*args, **kwargs):
resp = sup(*args, **kwargs)
return background_callback(self, resp)
return self.executor.submit(func, *args, **kwargs)
return sup(*args, **kwargs)

View file

@ -28,8 +28,8 @@ from weboob.tools.misc import to_unicode
from weboob.tools.log import getLogger
from weboob.tools.exceptions import ParseError
from weboob.tools.browser2.elements import TableElement, ItemElement
from weboob.tools.browser2.filters.standard import Filter, CleanText, CleanDecimal, TableCell
from weboob.browser2.elements import TableElement, ItemElement
from weboob.browser2.filters.standard import Filter, CleanText, CleanDecimal, TableCell
__all__ = ['FrenchTransaction', 'AmericanTransaction']