weboob-devel/weboob/tools/browser2/elements.py
Florent 76cb004eb4 Move ItemListTable-Element outside of page.py
One of the goal is to not import all modules needed by filters by
loading the page file.

In the same goal, move the import of parsers in the class definition.
2014-07-09 10:23:20 +02:00

241 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
# Copyright(C) 2014 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
import sys
from copy import deepcopy
from .filters import _Filter, CleanText, AttributeNotFound, XPathNotFound
from weboob.tools.log import getLogger
from weboob.tools.browser2.page import NextPage
class DataError(Exception):
"""
Returned data from pages are incoherent.
"""
class AbstractElement(object):
def __init__(self, page, parent=None, el=None):
self.page = page
self.parent = parent
if el is not None:
self.el = el
elif parent is not None:
self.el = parent.el
else:
self.el = page.doc
if parent is not None:
self.env = deepcopy(parent.env)
else:
self.env = deepcopy(page.params)
def use_selector(self, func):
if isinstance(func, _Filter):
value = func(self)
elif callable(func):
value = func()
else:
value = deepcopy(func)
return value
def parse(self, obj):
pass
def cssselect(self, *args, **kwargs):
return self.el.cssselect(*args, **kwargs)
def xpath(self, *args, **kwargs):
return self.el.xpath(*args, **kwargs)
class SkipItem(Exception):
"""
Raise this exception in an :class:`ItemElement` subclass to skip an item.
"""
class _ItemElementMeta(type):
"""
Private meta-class used to keep order of obj_* attributes in :class:`ItemElement`.
"""
def __new__(mcs, name, bases, attrs):
_attrs = []
for base in bases:
if hasattr(base, '_attrs'):
_attrs += base._attrs
filters = [(re.sub('^obj_', '', attr_name), attrs[attr_name]) for attr_name, obj in attrs.items() if attr_name.startswith('obj_')]
# constants first, then filters, then methods
filters.sort(key=lambda x: x[1]._creation_counter if hasattr(x[1], '_creation_counter') else (sys.maxint if callable(x[1]) else 0))
new_class = super(_ItemElementMeta, mcs).__new__(mcs, name, bases, attrs)
new_class._attrs = _attrs + [f[0] for f in filters]
return new_class
class ItemElement(AbstractElement):
__metaclass__ = _ItemElementMeta
_attrs = None
klass = None
condition = None
validate = None
class Index(object):
pass
def __init__(self, *args, **kwargs):
super(ItemElement, self).__init__(*args, **kwargs)
self.obj = None
def build_object(self):
if self.klass is None:
return
return self.klass()
def __call__(self, obj=None):
if obj is not None:
self.obj = obj
for obj in self:
return obj
def __iter__(self):
if self.condition is not None and not self.condition():
return
try:
if self.obj is None:
self.obj = self.build_object()
self.parse(self.el)
for attr in self._attrs:
self.handle_attr(attr, getattr(self, 'obj_%s' % attr))
except SkipItem:
return
if self.validate is not None and not self.validate(self.obj):
return
yield self.obj
def handle_attr(self, key, func):
value = self.use_selector(func)
setattr(self.obj, key, value)
class ListElement(AbstractElement):
item_xpath = None
flush_at_end = False
ignore_duplicate = False
def __init__(self, *args, **kwargs):
super(ListElement, self).__init__(*args, **kwargs)
self.logger = getLogger(self.__class__.__name__.lower())
self.objects = {}
def __call__(self, *args, **kwargs):
for key, value in kwargs.iteritems():
self.env[key] = value
return self.__iter__()
def __iter__(self):
self.parse(self.el)
if self.item_xpath is not None:
for el in self.el.xpath(self.item_xpath):
for obj in self.handle_element(el):
if not self.flush_at_end:
yield obj
else:
for obj in self.handle_element(self.el):
if not self.flush_at_end:
yield obj
if self.flush_at_end:
for obj in self.objects.itervalues():
yield obj
self.check_next_page()
def check_next_page(self):
if not hasattr(self, 'next_page'):
return
next_page = getattr(self, 'next_page')
try:
value = self.use_selector(next_page)
except (AttributeNotFound, XPathNotFound):
return
if value is None:
return
raise NextPage(value)
def store(self, obj):
if obj.id:
if obj.id in self.objects:
if self.ignore_duplicate:
self.logger.warning('There are two objects with the same ID! %s' % obj.id)
return
else:
raise DataError('There are two objects with the same ID! %s' % obj.id)
self.objects[obj.id] = obj
return obj
def handle_element(self, el):
for attrname in dir(self):
attr = getattr(self, attrname)
if isinstance(attr, type) and issubclass(attr, AbstractElement) and attr != type(self):
for obj in attr(self.page, self, el):
obj = self.store(obj)
if obj:
yield obj
class TableElement(ListElement):
head_xpath = None
cleaner = CleanText
def __init__(self, *args, **kwargs):
super(TableElement, self).__init__(*args, **kwargs)
self._cols = {}
columns = {}
for attrname in dir(self):
m = re.match('col_(.*)', attrname)
if m:
cols = getattr(self, attrname)
if not isinstance(cols, (list,tuple)):
cols = [cols]
columns[m.group(1)] = [s.lower() for s in cols]
for colnum, el in enumerate(self.el.xpath(self.head_xpath)):
title = self.cleaner.clean(el).lower()
for name, titles in columns.iteritems():
if title in titles:
self._cols[name] = colnum
def get_colnum(self, name):
return self._cols.get(name, None)