Move ItemListTable-Element outside of page.py

One of the goal is to not import all modules needed by filters by
loading the page file.

In the same goal, move the import of parsers in the class definition.
This commit is contained in:
Florent 2014-07-08 19:59:37 +02:00
commit 76cb004eb4
33 changed files with 305 additions and 265 deletions

View file

@ -24,15 +24,11 @@ try:
except ImportError:
from urllib import unquote
import re
import sys
from copy import deepcopy
from io import BytesIO
import requests
import lxml.html as html
import lxml.etree as etree
from weboob.tools.json import json
from weboob.tools.ordereddict import OrderedDict
from weboob.tools.regex_helper import normalize
from weboob.tools.compat import basestring
@ -40,7 +36,6 @@ from weboob.tools.compat import basestring
from weboob.tools.log import getLogger
from .browser import DomainBrowser
from .filters import _Filter, CleanText, AttributeNotFound, XPathNotFound
class UrlNotResolvable(Exception):
@ -49,12 +44,6 @@ class UrlNotResolvable(Exception):
"""
class DataError(Exception):
"""
Returned data from pages are incoherent.
"""
class URL(object):
"""
A description of an URL on the PagesBrowser website.
@ -538,6 +527,7 @@ class Form(OrderedDict):
class JsonPage(BasePage):
def __init__(self, browser, response, *args, **kwargs):
super(JsonPage, self).__init__(browser, response, *args, **kwargs)
from weboob.tools.json import json
self.doc = json.loads(response.text)
@ -550,6 +540,7 @@ class XMLPage(BasePage):
def __init__(self, browser, response, *args, **kwargs):
super(XMLPage, self).__init__(browser, response, *args, **kwargs)
import lxml.etree as etree
parser = etree.XMLParser(encoding=self.ENCODING or response.encoding)
self.doc = etree.parse(BytesIO(response.content), parser)
@ -575,6 +566,7 @@ class HTMLPage(BasePage):
def __init__(self, browser, response, *args, **kwargs):
super(HTMLPage, self).__init__(browser, response, *args, **kwargs)
self.encoding = self.ENCODING or response.encoding
import lxml.html as html
parser = html.HTMLParser(encoding=self.encoding)
self.doc = html.parse(BytesIO(response.content), parser)
@ -613,228 +605,6 @@ def method(klass):
return inner
class AbstractElement(object):
def __init__(self, page, parent=None, el=None):
self.page = page
self.parent = parent
if el is not None:
self.el = el
elif parent is not None:
self.el = parent.el
else:
self.el = page.doc
if parent is not None:
self.env = deepcopy(parent.env)
else:
self.env = deepcopy(page.params)
def use_selector(self, func):
if isinstance(func, _Filter):
value = func(self)
elif callable(func):
value = func()
else:
value = deepcopy(func)
return value
def parse(self, obj):
pass
def cssselect(self, *args, **kwargs):
return self.el.cssselect(*args, **kwargs)
def xpath(self, *args, **kwargs):
return self.el.xpath(*args, **kwargs)
class ListElement(AbstractElement):
item_xpath = None
flush_at_end = False
ignore_duplicate = False
def __init__(self, *args, **kwargs):
super(ListElement, self).__init__(*args, **kwargs)
self.logger = getLogger(self.__class__.__name__.lower())
self.objects = OrderedDict()
def __call__(self, *args, **kwargs):
for key, value in kwargs.iteritems():
self.env[key] = value
return self.__iter__()
def find_elements(self):
"""
Get the nodes that will have to be processed.
This method can be overridden if xpath filters are not
sufficient.
"""
if self.item_xpath is not None:
for el in self.el.xpath(self.item_xpath):
yield el
else:
yield self.el
def __iter__(self):
self.parse(self.el)
for el in self.find_elements():
for obj in self.handle_element(el):
if not self.flush_at_end:
yield obj
if self.flush_at_end:
for obj in self.flush():
yield obj
self.check_next_page()
def flush(self):
for obj in self.objects.itervalues():
yield obj
def check_next_page(self):
if not hasattr(self, 'next_page'):
return
next_page = getattr(self, 'next_page')
try:
value = self.use_selector(next_page)
except (AttributeNotFound, XPathNotFound):
return
if value is None:
return
raise NextPage(value)
def store(self, obj):
if obj.id:
if obj.id in self.objects:
if self.ignore_duplicate:
self.logger.warning('There are two objects with the same ID! %s' % obj.id)
return
else:
raise DataError('There are two objects with the same ID! %s' % obj.id)
self.objects[obj.id] = obj
return obj
def handle_element(self, el):
for attrname in dir(self):
attr = getattr(self, attrname)
if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self):
for obj in attr(self.page, self, el):
obj = self.store(obj)
if obj:
yield obj
class SkipItem(Exception):
"""
Raise this exception in an :class:`ItemElement` subclass to skip an item.
"""
class _ItemElementMeta(type):
"""
Private meta-class used to keep order of obj_* attributes in :class:`ItemElement`.
"""
def __new__(mcs, name, bases, attrs):
_attrs = []
for base in bases:
if hasattr(base, '_attrs'):
_attrs += base._attrs
filters = [(re.sub('^obj_', '', attr_name), attrs[attr_name]) for attr_name, obj in attrs.items() if attr_name.startswith('obj_')]
# constants first, then filters, then methods
filters.sort(key=lambda x: x[1]._creation_counter if hasattr(x[1], '_creation_counter') else (sys.maxint if callable(x[1]) else 0))
new_class = super(_ItemElementMeta, mcs).__new__(mcs, name, bases, attrs)
new_class._attrs = _attrs + [f[0] for f in filters]
return new_class
class ItemElement(AbstractElement):
__metaclass__ = _ItemElementMeta
_attrs = None
klass = None
condition = None
validate = None
class Index(object):
pass
def __init__(self, *args, **kwargs):
super(ItemElement, self).__init__(*args, **kwargs)
self.obj = None
def build_object(self):
if self.klass is None:
return
return self.klass()
def __call__(self, obj=None):
if obj is not None:
self.obj = obj
for obj in self:
return obj
def __iter__(self):
if self.condition is not None and not self.condition():
return
try:
if self.obj is None:
self.obj = self.build_object()
self.parse(self.el)
for attr in self._attrs:
self.handle_attr(attr, getattr(self, 'obj_%s' % attr))
except SkipItem:
return
if self.validate is not None and not self.validate(self.obj):
return
yield self.obj
def handle_attr(self, key, func):
value = self.use_selector(func)
setattr(self.obj, key, value)
class TableElement(ListElement):
head_xpath = None
cleaner = CleanText
def __init__(self, *args, **kwargs):
super(TableElement, self).__init__(*args, **kwargs)
self._cols = {}
columns = {}
for attrname in dir(self):
m = re.match('col_(.*)', attrname)
if m:
cols = getattr(self, attrname)
if not isinstance(cols, (list,tuple)):
cols = [cols]
columns[m.group(1)] = [s.lower() for s in cols]
for colnum, el in enumerate(self.el.xpath(self.head_xpath)):
title = self.cleaner.clean(el).lower()
for name, titles in columns.iteritems():
if title in titles:
self._cols[name] = colnum
def get_colnum(self, name):
return self._cols.get(name, None)
class LoggedPage(object):
"""
A page that only logged users can reach. If we did not get a redirection