From 2766983a3333c7450ed331db4dc0688ffe5f93cc Mon Sep 17 00:00:00 2001 From: Vincent Paredes Date: Thu, 20 Mar 2014 18:26:45 +0100 Subject: [PATCH] hsbc to browser2! --- modules/hsbc/backend.py | 13 +-- modules/hsbc/browser.py | 143 ++++++++-------------------- modules/hsbc/pages.py | 169 +++++++++++++++++++++++++++++++++ modules/hsbc/pages/__init__.py | 0 modules/hsbc/pages/accounts.py | 152 ----------------------------- modules/hsbc/pages/login.py | 76 --------------- 6 files changed, 213 insertions(+), 340 deletions(-) create mode 100644 modules/hsbc/pages.py delete mode 100644 modules/hsbc/pages/__init__.py delete mode 100644 modules/hsbc/pages/accounts.py delete mode 100644 modules/hsbc/pages/login.py diff --git a/modules/hsbc/backend.py b/modules/hsbc/backend.py index dcfe4b72..4900dd84 100644 --- a/modules/hsbc/backend.py +++ b/modules/hsbc/backend.py @@ -59,14 +59,5 @@ class HSBCBackend(BaseBackend, ICapBank): raise AccountNotFound() def iter_history(self, account): - with self.browser: - for tr in self.browser.get_history(account): - # If there are deferred cards, strip CB invoices. - if not tr._coming and (not tr.raw.startswith('FACTURES CB') or len(account._card_links) == 0): - yield tr - - def iter_coming(self, account): - with self.browser: - for tr in self.browser.get_history(account): - if tr._coming: - yield tr + for tr in self.browser.get_history(account): + yield tr diff --git a/modules/hsbc/browser.py b/modules/hsbc/browser.py index f453d6e5..256ceeed 100644 --- a/modules/hsbc/browser.py +++ b/modules/hsbc/browser.py @@ -19,111 +19,60 @@ from datetime import timedelta -import urllib -import re from weboob.tools.date import LinearDateGuesser -from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword, BasePage, BrokenPageError -from weboob.tools.decorators import retry -from .pages.accounts import AccountsListPage, CPTHistoryPage, CardHistoryPage -from .pages.login import LoginPage +from weboob.tools.browser import BrowserIncorrectPassword +from weboob.tools.browser2 import LoginBrowser, URL, need_login +from .pages import AccountsPage, CBOperationPage, CPTOperationPage, LoginPage __all__ = ['HSBC'] -class NotLoggedPage(BasePage): - pass - - -class HSBC(BaseBrowser): - DOMAIN = 'client.hsbc.fr' - PROTOCOL = 'https' +class HSBC(LoginBrowser): + VERIFY = False + BASEURL = 'https://client.hsbc.fr' CERTHASH = '48d84a782728eeeb622e9ff721688365e24f555ae1aec49b3be33831c7fe24e6' - ENCODING = None # refer to the HTML encoding - PAGES = {'https://client.hsbc.fr/session_absente.html': NotLoggedPage, - 'https://client.hsbc.fr/cgi-bin/emcgi.*\?.*debr=COMPTES_PAN': AccountsListPage, - 'https://client.hsbc.fr/cgi-bin/emcgi.*\?.*CPT_IdPrestation=.*': CPTHistoryPage, - 'https://client.hsbc.fr/cgi-bin/emcgi.*\?.*CB_IdPrestation=.*': CardHistoryPage, - 'https://www.hsbc.fr/.*': LoginPage, - 'https://client.hsbc.fr/cgi-bin/emcgi': LoginPage, - } - - _session = None + connection = URL('https://www.hsbc.fr/1/2/hsbc-france/particuliers/connexion', LoginPage) + login = URL('https://www.hsbc.fr/1/*', LoginPage) + cptPage = URL('/cgi-bin/emcgi.*\&CPT_IdPrestation.*', + '/cgi-bin/emcgi.*\&Ass_IdPrestation.*', + CPTOperationPage) + cbPage = URL('/cgi-bin/emcgi.*\&CB_IdPrestation.*', + CBOperationPage) + accounts = URL('/cgi-bin/emcgi', AccountsPage) def __init__(self, username, password, secret, *args, **kwargs): self.secret = secret - BaseBrowser.__init__(self, username, password, *args, **kwargs) + LoginBrowser.__init__(self, username, password, *args, **kwargs) def home(self): - self.login() + return self.login.go() - def is_logged(self): - return self._session is not None and not self.is_on_page((NotLoggedPage,LoginPage)) + def do_login(self): + self.connection.stay_or_go() + self.page.login(self.username) - @retry(BrokenPageError, tries=2) - def login(self): - assert isinstance(self.username, basestring) - assert isinstance(self.password, basestring) - - self._ua_handlers['_cookies'].cookiejar.clear() - - if len(self.username) == 11 and self.username.isdigit(): - self.login_france() - else: - self.login_world() - - def login_france(self): - data = {'Ident': self.username} - r = self.readurl('https://client.hsbc.fr/cgi-bin/emcgi?Appl=WEBACC', urllib.urlencode(data), if_fail='raise') - m = re.search('sessionid=([^ "]+)', r, flags=re.MULTILINE) - if not m: + no_secure_key_link = self.page.get_no_secure_key() + if not no_secure_key_link: raise BrowserIncorrectPassword() + self.location(no_secure_key_link) - self._session = m.group(1) + self.page.login_w_secure(self.login, self.password, self.secret) + self.page.useless_form() - data = {'Secret': self.password} - r = self.readurl('https://client.hsbc.fr/cgi-bin/emcgi?sessionid=%s' % self._session, urllib.urlencode(data), if_fail='raise') - if r.find('Erreur Identification') >= 0: + home_url = self.page.get_frame() + if not home_url: raise BrowserIncorrectPassword() + self.location(home_url) - m = re.search('url = "/cgi-bin/emcgi\?sessionid=([^& "]+)&debr="', r, flags=re.MULTILINE) - if not m: - raise BrokenPageError('Unable to find session token') - - self._session = m.group(1) - - def login_world(self): - data = {'Appl': 'WEBACC', - 'CODE_ABONNE': self.username, - 'Ident': self.username, - 'ifr': 0, - 'nextPage': 'localsso.hbfr.Redirect', - 'secret': '', - 'userid': self.username, - } - self.location('https://www.hsbc.fr/1/2/?idv_cmd=idv.Authentication', urllib.urlencode(data), no_login=True) - - self.page.login(self.username, self.secret, self.password) - - error = self.page.get_error() - if error is not None: - raise BrowserIncorrectPassword(error) - - self._session = self.page.get_session() - + @need_login def get_accounts_list(self): - self.location(self.buildurl('/cgi-bin/emcgi', sessionid=self._session, debr='COMPTES_PAN')) - - return self.page.get_list() + return self.accounts.stay_or_go().iter_accounts() def get_account(self, id): assert isinstance(id, basestring) - - if not self.is_on_page(AccountsListPage): - l = self.get_accounts_list() - else: - l = self.page.get_list() + l = self.get_accounts_list() for a in l: if a.id == id: @@ -134,25 +83,17 @@ class HSBC(BaseBrowser): def get_history(self, account): if account._link_id is None: return + self.location(account._link_id) - for tr in self._get_history(account._link_id): + if self.page is None: + return + + if self.cbPage.is_here(): + guesser = LinearDateGuesser(date_max_bump=timedelta(45)) + return self.pagination(lambda: self.page.get_history(date_guesser=guesser)) + else: + return self._get_history() + + def _get_history(self): + for tr in self.page.get_history(): yield tr - - for card in account._card_links: - for tr in self._get_history(card): - yield tr - - def _get_history(self, link): - num_page = 0 - guesser = LinearDateGuesser(date_max_bump=timedelta(45)) - while link is not None: - self.location(link) - - if self.page is None: - return - - for tr in self.page.get_operations(num_page, guesser): - yield tr - - link = self.page.get_next_link() - num_page += 1 diff --git a/modules/hsbc/pages.py b/modules/hsbc/pages.py new file mode 100644 index 00000000..459b9e36 --- /dev/null +++ b/modules/hsbc/pages.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2010-2012 Julien Veyssier +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from urlparse import urlparse, parse_qs +import re + +from weboob.tools.browser2.page import HTMLPage, method, ListElement, ItemElement, SkipItem, LoggedPage +from weboob.tools.browser2.filters import Filter, Env, CleanText, CleanDecimal, Link, Field, DateGuesser, TableCell +from weboob.capabilities import NotAvailable +from weboob.capabilities.bank import Account +from weboob.tools.capabilities.bank.transactions import FrenchTransaction + +class Transaction(FrenchTransaction): + PATTERNS = [(re.compile('^VIR(EMENT)? (?P.*)'), FrenchTransaction.TYPE_TRANSFER), + (re.compile('^PRLV (?P.*)'), FrenchTransaction.TYPE_ORDER), + (re.compile('^(?P.*) CARTE \d+ PAIEMENT CB\s+(?P
\d{2})(?P\d{2}) ?(.*)$'), + FrenchTransaction.TYPE_CARD), + (re.compile('^RETRAIT DAB (?P
\d{2})(?P\d{2}) (?P.*) CARTE [\*\d]+'), + FrenchTransaction.TYPE_WITHDRAWAL), + (re.compile('^CHEQUE( (?P.*))?$'), FrenchTransaction.TYPE_CHECK), + (re.compile('^(F )?COTIS\.? (?P.*)'),FrenchTransaction.TYPE_BANK), + (re.compile('^(REMISE|REM CHQ) (?P.*)'),FrenchTransaction.TYPE_DEPOSIT), + ] + + _is_coming = False + +class AccountsPage(LoggedPage, HTMLPage): + def get_frame(self): + try: + a = self.doc.xpath(u'//frame["@name=FrameWork"]')[0] + except IndexError: + return None + else: + return a.attrib['src'] + + @method + class iter_accounts(ListElement): + item_xpath = '//tr' + flush_at_end = True + + class item(ItemElement): + klass = Account + + def condition(self): + return len(self.el.xpath('./td')) > 2 + + class Label(Filter): + def filter(self, text): + return text.lstrip(' 0123456789').title() + + class Type(Filter): + def filter(self, label): + return Account.TYPE_UNKNOWN + + obj_id = Env('id') + obj_label = Label(CleanText('./td[1]/a')) + obj_coming = Env('coming') + obj_balance = Env('balance') + obj_currency = FrenchTransaction.Currency('./td[2] | ./td[3]') + obj__link_id = Link('./td[1]/a') + obj_type = Type(Field('label')) + + def parse(self, el): + link = el.xpath('./td[1]/a')[0].get('href', '') + url = urlparse(link) + p = parse_qs(url.query) + + if 'CPT_IdPrestation' in p: + id = p['CPT_IdPrestation'][0] + elif 'Ass_IdPrestation' in p: + id = p['Ass_IdPrestation'][0] + elif 'CB_IdPrestation' in p: + id = p['CB_IdPrestation'][0] + else: + raise SkipItem() + + balance = CleanDecimal('./td[3]')(self) + + self.env['id'] = id + self.env['balance'] = balance + self.env['coming'] = NotAvailable + + +class Pagination(object): + def next_page(self): + links = self.page.doc.xpath('//a[@class="fleche"]') + if len(links) == 0: + return + current_page_found= False + for link in links: + l = link.attrib.get('href') + if current_page_found and "#op" not in l: + #Adding CB_IdPrestation so browser2 use CBOperationPage + return l + "&CB_IdPrestation" + elif "#op" in l: + current_page_found = True + return + + +class CBOperationPage(LoggedPage, HTMLPage): + @method + class get_history(Pagination, Transaction.TransactionsElement): + head_xpath = '//table//tr/th' + item_xpath = '//table//tr' + + class item(Transaction.TransactionElement): + condition = lambda self: len(self.el.xpath('./td')) >= 4 + + obj_date = DateGuesser(CleanText(TableCell("date")), Env("date_guesser")) + obj_vdate = DateGuesser(CleanText(TableCell("date")), Env("date_guesser")) + +class CPTOperationPage(LoggedPage, HTMLPage): + def get_history(self): + for script in self.doc.xpath('//script'): + if script.text is None or script.text.find('\nCL(0') < 0: + continue + + for m in re.finditer(r"CL\((\d+),'(.+)','(.+)','(.+)','([\d -\.,]+)',('([\d -\.,]+)',)?'\d+','\d+','[\w\s]+'\);", script.text, flags=re.MULTILINE): + op = Transaction(m.group(1)) + op.parse(date=m.group(3), raw=re.sub(u'[ ]+', u' ', m.group(4).replace(u'\n', u' '))) + op.set_amount(m.group(5)) + op._coming = (re.match('\d+/\d+/\d+', m.group(2)) is None) + yield op + +class LoginPage(HTMLPage): + def login(self, login): + form = self.get_form(nr=2) + form['userid'] = login + form.submit() + + def get_no_secure_key(self): + try: + a = self.doc.xpath(u'//a[contains(text(), "Without HSBC Secure Key")]')[0] + except IndexError: + return None + else: + return a.attrib['href'] + + def login_w_secure(self, login, password, secret): + form = self.get_form(nr=0) + form['memorableAnswer'] = secret + inputs = self.doc.xpath(u'//input[starts-with(@id, "keyrcc_password_first")]') + split_pass = u'' + for i,inpu in enumerate(inputs): + #The good field are 1,2,3 and the bad one are 11,12,21,23,24,31 and so one + if int(inpu.attrib['id'].split('first')[1]) < 10: + split_pass += password[i] + form['password'] = split_pass + form.submit() + + def useless_form(self): + form = self.get_form(nr=0) + form.submit() diff --git a/modules/hsbc/pages/__init__.py b/modules/hsbc/pages/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/modules/hsbc/pages/accounts.py b/modules/hsbc/pages/accounts.py deleted file mode 100644 index 84d9d823..00000000 --- a/modules/hsbc/pages/accounts.py +++ /dev/null @@ -1,152 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright(C) 2012-2013 Romain Bignon -# -# This file is part of weboob. -# -# weboob is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# weboob is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with weboob. If not, see . - - -from decimal import Decimal -import re - -from weboob.tools.browser import BasePage -from weboob.capabilities.bank import Account -from weboob.tools.capabilities.bank.transactions import FrenchTransaction - - -__all__ = ['AccountsListPage', 'CPTHistoryPage', 'CardHistoryPage'] - - -class AccountsListPage(BasePage): - def get_list(self): - accounts = [] - for tr in self.document.getiterator('tr'): - tds = tr.findall('td') - if len(tds) != 3 or tds[0].find('a') is None or tds[0].find('a').attrib.get('class', '') != 'flecheM': - continue - - account = Account() - account.id = tds[1].text.strip() - - a = tds[0].findall('a')[-1] - account.label = unicode(a.text.strip()) - account._link_id = a.attrib['href'] - - balance = u''.join([txt.strip() for txt in tds[2].itertext()]) - account.balance = Decimal(FrenchTransaction.clean_amount(balance)) - - # check account type - m = re.search('(\w+)_IdPrestation', account._link_id) - account_type = None - if m: - account_type = m.group(1) - if account_type != 'CPT': - account.id += '.%s' % account_type - - if account_type == 'CB': - accounts[0]._card_links.append(account._link_id) - if not accounts[0].coming: - accounts[0].coming = Decimal('0.0') - accounts[0].coming += account.balance - continue - - if account_type != 'CPT': - # Don't support other kind of account histories. - account._link_id = None - - account.currency = account.get_currency(tds[1].text) - account._card_links = [] - - accounts.append(account) - - return iter(accounts) - - -class Transaction(FrenchTransaction): - PATTERNS = [(re.compile('^VIR(EMENT)? (?P.*)'), FrenchTransaction.TYPE_TRANSFER), - (re.compile('^PRLV (?P.*)'), FrenchTransaction.TYPE_ORDER), - (re.compile('^CB (?P.*)\s+(?P
\d+)/(?P\d+)\s*(?P.*)'), - FrenchTransaction.TYPE_CARD), - (re.compile('^DAB (?P
\d{2})/(?P\d{2}) ((?P\d{2})H(?P\d{2}) )?(?P.*?)( CB N°.*)?$'), - FrenchTransaction.TYPE_WITHDRAWAL), - (re.compile('^CHEQUE$'), FrenchTransaction.TYPE_CHECK), - (re.compile('^COTIS\.? (?P.*)'), FrenchTransaction.TYPE_BANK), - (re.compile('^REMISE (?P.*)'), FrenchTransaction.TYPE_DEPOSIT), - ] - - -class HistoryPage(BasePage): - def get_next_link(self): - return None - - def get_operations(self, num_page, date_guesser): - raise NotImplementedError() - - -class CPTHistoryPage(HistoryPage): - def get_operations(self, num_page, date_guesser): - for script in self.document.getiterator('script'): - if script.text is None or script.text.find('\nCL(0') < 0: - continue - - for m in re.finditer(r"CL\((\d+),'(.+)','(.+)','(.+)','([\d -\.,]+)',('([\d -\.,]+)',)?'\d+','\d+','[\w\s]+'\);", script.text, flags=re.MULTILINE): - op = Transaction(m.group(1)) - op.parse(date=m.group(3), raw=re.sub(u'[ ]+', u' ', m.group(4).replace(u'\n', u' '))) - op.set_amount(m.group(5)) - op._coming = (re.match('\d+/\d+/\d+', m.group(2)) is None) - yield op - - -class CardHistoryPage(HistoryPage): - def get_next_link(self): - ok = False - for link in self.document.xpath('//form[@name="FORM_LIB_CARTE"]/a[@class="fleche"]'): - if link.attrib['href'].startswith('#'): - ok = True - elif ok: - # add CB_IdPrestation to handle the correct page on browser. - return link.attrib['href'] + '&CB_IdPrestation=' - - def parse_date(self, guesser, string, store=False): - day, month = map(int, string.split('/')) - return guesser.guess_date(day, month, store) - - def get_operations(self, num_page, date_guesser): - debit_date = None - for tr in self.document.xpath('//div[@id="tabs-1"]/table//tr'): - cols = tr.findall('td') - if len(cols) == 1: - text = self.parser.tocleanstring(cols[0]) - m = re.search('(\d+/\d+)', text) - if m: - # if there are several months on the same page, the second - # one's operations are already debited. - if debit_date is not None: - num_page += 1 - debit_date = self.parse_date(date_guesser, m.group(1), True) - continue - - if len(cols) < 4: - continue - - op = Transaction('') - op.parse(date=debit_date, - raw=self.parser.tocleanstring(cols[1])) - op.rdate = self.parse_date(date_guesser, self.parser.tocleanstring(cols[0])) - op.type = op.TYPE_CARD - op._coming = (num_page == 0) - op.set_amount(self.parser.tocleanstring(cols[-1]), - self.parser.tocleanstring(cols[-2])) - yield op diff --git a/modules/hsbc/pages/login.py b/modules/hsbc/pages/login.py deleted file mode 100644 index 55bf18a0..00000000 --- a/modules/hsbc/pages/login.py +++ /dev/null @@ -1,76 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright(C) 2012-2013 Romain Bignon -# -# This file is part of weboob. -# -# weboob is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# weboob is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with weboob. If not, see . - - -import re -import urllib -from mechanize import FormNotFoundError - -from weboob.tools.browser import BasePage, BrokenPageError - - -__all__ = ['LoginPage'] - - -class LoginPage(BasePage): - def on_loaded(self): - try: - self.browser.select_form(name="form1") - except FormNotFoundError: - pass - else: - self.browser.submit(nologin=True) - - def login(self, username, secret, password): - form_world = self.document.xpath('//form[@name="auth04"]') - url = form_world[0].attrib['action'] - datastr = "TF1;015;;;;;;;;;;;;;;;;;;;;;;Mozilla;Netscape;5.0%20%28X11%29;20100101;undefined;true;Linux%20x86_64;true;Linux%20x86_64;undefined;Mozilla/5.0%20%28X11%3B%20Linux%20x86_64%3B%20rv%3A19.0%29%20Gecko/20100101%20Firefox/19.0%20Iceweasel/19.0.2;en-US;undefined;www.hsbc.fr;undefined;undefined;undefined;undefined;true;true;1365177015380;1;Tue%2007%20Jun%202005%2009%3A33%3A44%20PM%20CEST;1280;1024;;11.2;;;;;123;-60;-120;Fri%2005%20Apr%202013%2005%3A50%3A15%20PM%20CEST;24;1280;1024;0;0;;;;;;Shockwave%20Flash%7CShockwave%20Flash%2011.2%20r202;;;;;;;;;;;;;17;" - data = {'FMNUserId': username, - 'memorableAnswer': secret, - 'password': '', - '__data': datastr, - '__custtype': 'GLOBAL', - - } - for i, field in enumerate(form_world[0].xpath('.//div[@class="csLabel"]/nobr/input[@type="password"]')): - if field.attrib['name'].startswith('keyrcc_password_first') and not 'disabled' in field.attrib: - data[field.attrib['name']] = password[i] - data['password'] += password[i] - - if url.startswith('/'): - url = 'https://www.hsbc.fr%s' % url - - self.browser.location(url, urllib.urlencode(data), no_login=True) - - def get_error(self): - try: - return self.parser.tocleanstring(self.document.xpath('//font[@color="red"]')[0]) - except IndexError: - return None - - def get_session(self): - try: - frame = self.document.xpath('//frame[@name="FrameWork"]')[0] - except IndexError: - raise BrokenPageError('Unable to find session token') - - m = re.search('sessionid=([^& "]+)', frame.attrib['src']) - if not m: - raise BrokenPageError('Unable to find session token') - return m.group(1)