support pro accounts

This commit is contained in:
Romain Bignon 2013-03-12 13:04:25 +01:00
commit a8d8147452
10 changed files with 179 additions and 101 deletions

View file

View file

@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2009-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
from decimal import Decimal
from weboob.tools.capabilities.bank.transactions import FrenchTransaction
from weboob.capabilities.bank import Account
from weboob.capabilities.base import NotAvailable
from weboob.tools.browser import BasePage, BrokenPageError, BrowserPasswordExpired
__all__ = ['AccountsList', 'AccountPrelevement']
class AccountsList(BasePage):
ACCOUNT_TYPES = {
u'Liquidités': Account.TYPE_CHECKING,
u'Epargne disponible': Account.TYPE_SAVINGS,
u'Titres': Account.TYPE_MARKET,
u'Assurance vie': Account.TYPE_DEPOSIT,
u'Crédit immobilier': Account.TYPE_LOAN,
}
def on_loaded(self):
pass
def _parse_account_group(self, table):
typename = unicode(table.attrib.get('summary', '').replace('Liste des contrats/comptes ', ''))
typeid = self.ACCOUNT_TYPES.get(typename, Account.TYPE_UNKNOWN)
for tr in table.xpath('.//tr[not(@class)]'):
if tr.find('td') is not None and tr.find('td').attrib.get('class', '') == 'typeTitulaire':
account = self._parse_account(tr)
account.type = typeid
yield account
def _parse_account(self, tr):
account = Account()
# for pro usage
account._stp = None
account.id = tr.xpath('.//td[@class="libelleCompte"]/input')[0].attrib['id'][len('libelleCompte'):]
if len(str(account.id)) == 23:
account.id = str(account.id)[5:21]
a = tr.xpath('.//td[@class="libelleCompte"]/a')[0]
m = re.match(r'javascript:goToStatements\(\'(\d+)\'', a.get('onclick', ''))
if m:
account._link_id = m.group(1)
else:
# Can't get history for this account.
account._link_id = None
# To prevent multiple-IDs for CIF (for example), add an arbitrary char in ID.
account.id += 'C'
account.label = u''+a.text.strip()
tds = tr.findall('td')
account.currency = account.get_currency(tds[3].find('a').text)
account.balance = self._parse_amount(tds[3].find('a'))
if tds[4].find('a') is not None:
account.coming = self._parse_amount(tds[4].find('a'))
else:
account.coming = NotAvailable
return account
def _parse_amount(self, elem):
return Decimal(FrenchTransaction.clean_amount(elem.text))
def get_list(self):
l = []
for table in self.document.xpath('//table[@class="tableCompte"]'):
for account in self._parse_account_group(table):
l.append(account)
if len(l) == 0:
# oops, no accounts? check if we have not exhausted the allowed use
# of this password
for img in self.document.getroot().cssselect('img[align="middle"]'):
if img.attrib.get('alt', '') == 'Changez votre code secret':
raise BrowserPasswordExpired('Your password has expired')
return l
def get_execution_id(self):
return self.document.xpath('//input[@name="_flowExecutionKey"]')[0].attrib['value']
def get_messages_link(self):
"""
Get the link to the messages page, which seems to have an identifier in it.
"""
for link in self.parser.select(self.document.getroot(), 'div#pantalon div.interieur a'):
if 'MessagesRecus' in link.attrib.get('href', ''):
return link.attrib['href']
raise BrokenPageError('Unable to find the link to the messages page')
class AccountPrelevement(AccountsList):
pass

View file

@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2009-2011 Romain Bignon, Pierre Mazière
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import time
import re
from weboob.tools.mech import ClientForm
import urllib
from weboob.tools.browser import BasePage, BrowserUnavailable
from weboob.tools.captcha.virtkeyboard import MappedVirtKeyboard,VirtKeyboardError
__all__ = ['LoginPage', 'ConfirmPage', 'ChangePasswordPage']
class BNPVirtKeyboard(MappedVirtKeyboard):
symbols={'0':'9cc4789a2cb223e8f2d5e676e90264b5',
'1':'e10b58fc085f9683052d5a63c96fc912',
'2':'04ec647e7b3414bcc069f0c54eb55a4c',
'3':'fde84fd9bac725db8463554448f1e469',
'4':'2359eea8671bf112b58264bec0294f71',
'5':'82b55b63480114f04fad8c5c4fa5673a',
'6':'e074864faeaeabb3be3d118192cd8879',
'7':'af5740e4ca71fadc6f4ae1412d864a1c',
'8':'cab759c574038ad89a0e35cc76ab7214',
'9':'828cf0faf86ac78e7f43208907620527'
}
url="/NSImgGrille?timestamp=%d"
color=27
def __init__(self,basepage):
img=basepage.document.find("//img[@usemap='#MapGril']")
MappedVirtKeyboard.__init__(self,basepage.browser.openurl(self.url % time.time()),basepage.document,img,self.color)
self.check_symbols(self.symbols,basepage.browser.responses_dirname)
def get_symbol_code(self,md5sum):
code=MappedVirtKeyboard.get_symbol_code(self,md5sum)
return code[-4:-2]
def get_string_code(self,string):
code=''
for c in string:
code+=self.get_symbol_code(self.symbols[c])
return code
class LoginPage(BasePage):
def on_loaded(self):
for td in self.document.getroot().cssselect('td.LibelleErreur'):
if td.text is None:
continue
msg = td.text.strip()
if 'indisponible' in msg:
raise BrowserUnavailable(msg)
def login(self, login, password):
try:
vk=BNPVirtKeyboard(self)
except VirtKeyboardError,err:
self.logger.error("Error: %s"%err)
return False
self.browser.select_form('logincanalnet')
# HACK because of fucking malformed HTML, the field isn't recognized by mechanize.
self.browser.controls.append(ClientForm.TextControl('text', 'ch1', {'value': ''}))
self.browser.set_all_readonly(False)
self.browser['ch1'] = login
self.browser['ch5'] = vk.get_string_code(password)
self.browser.submit()
class ConfirmPage(BasePage):
def get_error(self):
for td in self.document.xpath('//td[@class="hdvon1"]'):
if td.text:
return td.text.strip()
return None
def get_relocate_url(self):
script = self.document.xpath('//script')[0]
m = re.match('document.location.replace\("(.*)"\)', script.text[script.text.find('document.location.replace'):])
if m:
return m.group(1)
class InfoMessagePage(BasePage):
def on_loaded(self):
pass
class ChangePasswordPage(BasePage):
def change_password(self, current, new):
try:
vk=BNPVirtKeyboard(self)
except VirtKeyboardError,err:
self.logger.error("Error: %s"%err)
return False
from mechanize import Cookie
c = Cookie(0, 'wbo_segment_369721', 'AA%7CAB%7C%7C%7C',
None, False,
'.secure.bnpparibas.net', True, True,
'/', False,
False,
None,
False,
None,
None,
{})
cookiejar = self.browser._ua_handlers["_cookies"].cookiejar
cookiejar.set_cookie(c)
code_current=vk.get_string_code(current)
code_new=vk.get_string_code(new)
data = (('ch1', code_current),
('ch2', code_new),
('radiobutton3', 'radiobutton'),
('ch3', code_new),
('x', 23),
('y', 13),
)
headers = {'Referer': self.url}
#headers = {'Referer': "https://www.secure.bnpparibas.net/SAF_CHM?Action=SAF_CHM&Origine=SAF_CHM&stp=%s" % (int(datetime.now().strftime('%Y%m%d%H%M%S')))}
#import time
#time.sleep(10)
request = self.browser.request_class('https://www.secure.bnpparibas.net/SAF_CHM_VALID', urllib.urlencode(data), headers)
self.browser.location(request)

View file

@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2012 Laurent Bachelier
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.browser import BasePage, BrokenPageError
from weboob.capabilities.messages import Message, Thread
from weboob.capabilities.base import NotLoaded
from weboob.tools.capabilities.messages.genericArticle import try_drop_tree
import re
from datetime import datetime
from lxml.html import make_links_absolute
__all__ = ['MessagesPage', 'MessagePage']
class MessagesPage(BasePage):
def iter_threads(self):
table = self.parser.select(self.document.getroot(), 'table#listeMessages', 1)
for tr in table.xpath('./tr'):
if tr.attrib.get('class', '') not in ('msgLu', 'msgNonLu'):
continue
author = unicode(self.parser.select(tr, 'td.colEmetteur', 1).text)
link = self.parser.select(tr, 'td.colObjet a', 1)
date_raw = self.parser.select(tr, 'td.colDate1', 1).attrib['data']
jsparams = re.search('\((.+)\)', link.attrib['onclick']).groups()[0]
jsparams = [i.strip('\'" ') for i in jsparams.split(',')]
page_id, _id, unread = jsparams
# this means unread on the website
unread = False if unread == "false" else True
# 2012/02/29:01h30min45sec
dt_match = re.match('(\d+)/(\d+)/(\d+):(\d+)h(\d+)min(\d+)sec', date_raw).groups()
dt_match = [int(d) for d in dt_match]
thread = Thread(_id)
thread._link_id = (page_id, unread)
thread.date = datetime(*dt_match)
thread.title = unicode(link.text)
message = Message(thread, 0)
message.set_empty_fields(None)
message.flags = message.IS_HTML
message.title = thread.title
message.date = thread.date
message.sender = author
message.content = NotLoaded # This is the only thing we are missing
thread.root = message
yield thread
class MessagePage(BasePage):
def get_content(self):
"""
Get the message content.
This page has a date, but it is less precise than the main list page,
so we only use it for the message content.
"""
try:
content = self.parser.select(self.document.getroot(),
'div.txtMessage div.contenu', 1)
except BrokenPageError:
# This happens with some old messages (2007)
content = self.parser.select(self.document.getroot(), 'div.txtMessage', 1)
content = make_links_absolute(content, self.url)
try_drop_tree(self.parser, content, 'script')
return self.parser.tostring(content)

View file

@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2009-2012 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
from weboob.tools.browser import BasePage
from weboob.tools.capabilities.bank.transactions import FrenchTransaction
__all__ = ['AccountHistory', 'AccountComing']
class Transaction(FrenchTransaction):
PATTERNS = [(re.compile(u'^(?P<category>CHEQUE)(?P<text>.*)'), FrenchTransaction.TYPE_CHECK),
(re.compile('^(?P<category>FACTURE CARTE) DU (?P<dd>\d{2})(?P<mm>\d{2})(?P<yy>\d{2}) (?P<text>.*?)( CA?R?T?E? ?\d*X*\d*)?$'),
FrenchTransaction.TYPE_CARD),
(re.compile('^(?P<category>(PRELEVEMENT|TELEREGLEMENT|TIP)) (?P<text>.*)'),
FrenchTransaction.TYPE_ORDER),
(re.compile('^(?P<category>ECHEANCEPRET)(?P<text>.*)'), FrenchTransaction.TYPE_LOAN_PAYMENT),
(re.compile('^(?P<category>RETRAIT DAB) (?P<dd>\d{2})/(?P<mm>\d{2})/(?P<yy>\d{2})( (?P<HH>\d+)H(?P<MM>\d+))? (?P<text>.*)'),
FrenchTransaction.TYPE_WITHDRAWAL),
(re.compile('^(?P<category>VIR(EMEN)?T? ((RECU|FAVEUR) TIERS|SEPA RECU)?)( /FRM)?(?P<text>.*)'),
FrenchTransaction.TYPE_TRANSFER),
(re.compile('^(?P<category>REMBOURST)(?P<text>.*)'), FrenchTransaction.TYPE_PAYBACK),
(re.compile('^(?P<category>COMMISSIONS)(?P<text>.*)'), FrenchTransaction.TYPE_BANK),
(re.compile('^(?P<text>(?P<category>REMUNERATION).*)'), FrenchTransaction.TYPE_BANK),
(re.compile('^(?P<category>REMISE CHEQUES)(?P<text>.*)'), FrenchTransaction.TYPE_DEPOSIT),
]
class AccountHistory(BasePage):
def iter_operations(self):
for tr in self.document.xpath('//table[@id="tableCompte"]//tr'):
if len(tr.xpath('td[@class="debit"]')) == 0:
continue
id = tr.find('td').find('input').attrib['id'].lstrip('_')
op = Transaction(id)
op.parse(date=tr.findall('td')[1].text,
raw=tr.findall('td')[2].text.replace(u'\xa0', u''))
debit = tr.xpath('.//td[@class="debit"]')[0].text
credit = tr.xpath('.//td[@class="credit"]')[0].text
op.set_amount(credit, debit)
yield op
def iter_coming_operations(self):
i = 0
for tr in self.document.xpath('//table[@id="tableauOperations"]//tr'):
if 'typeop' in tr.attrib:
tds = tr.findall('td')
if len(tds) != 3:
continue
text = tds[1].text or u''
text = text.replace(u'\xa0', u'')
for child in tds[1].getchildren():
if child.text: text += child.text
if child.tail: text += child.tail
i += 1
operation = Transaction(i)
operation.parse(date=tr.attrib['dateop'],
raw=text)
operation.set_amount(tds[2].text)
yield operation
class AccountComing(AccountHistory):
pass

View file

@ -0,0 +1,107 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
from weboob.tools.browser import BasePage, BrowserPasswordExpired
from weboob.tools.ordereddict import OrderedDict
from weboob.capabilities.bank import TransferError
__all__ = ['TransferPage', 'TransferConfirmPage', 'TransferCompletePage']
class Account(object):
def __init__(self, id, label, send_checkbox, receive_checkbox):
self.id = id
self.label = label
self.send_checkbox = send_checkbox
self.receive_checkbox = receive_checkbox
class TransferPage(BasePage):
def on_loaded(self):
for td in self.document.xpath('//td[@class="hdvon1"]'):
if td.text and 'Vous avez atteint le seuil de' in td.text:
raise BrowserPasswordExpired(td.text.strip())
def get_accounts(self):
accounts = OrderedDict()
for table in self.document.getiterator('table'):
if table.attrib.get('cellspacing') == '2':
for tr in table.cssselect('tr.hdoc1, tr.hdotc1'):
tds = tr.findall('td')
id = tds[1].text.replace(u'\xa0', u'')
label = tds[0].text
if label is None and tds[0].find('nobr') is not None:
label = tds[0].find('nobr').text
send_checkbox = tds[4].find('input').attrib['value'] if tds[4].find('input') is not None else None
receive_checkbox = tds[5].find('input').attrib['value'] if tds[5].find('input') is not None else None
account = Account(id, label, send_checkbox, receive_checkbox)
accounts[id] = account
return accounts
def transfer(self, from_id, to_id, amount, reason):
accounts = self.get_accounts()
# Transform RIBs to short IDs
if len(str(from_id)) == 23:
from_id = str(from_id)[5:21]
if len(str(to_id)) == 23:
to_id = str(to_id)[5:21]
try:
sender = accounts[from_id]
except KeyError:
raise TransferError('Account %s not found' % from_id)
try:
recipient = accounts[to_id]
except KeyError:
raise TransferError('Recipient %s not found' % to_id)
if sender.send_checkbox is None:
raise TransferError('Unable to make a transfer from %s' % sender.label)
if recipient.receive_checkbox is None:
raise TransferError('Unable to make a transfer to %s' % recipient.label)
self.browser.select_form(nr=0)
self.browser['C1'] = [sender.send_checkbox]
self.browser['C2'] = [recipient.receive_checkbox]
self.browser['T6'] = str(amount).replace('.', ',')
if reason:
self.browser['T5'] = reason.encode('utf-8')
self.browser.submit()
class TransferConfirmPage(BasePage):
def on_loaded(self):
for td in self.document.getroot().cssselect('td#size2'):
raise TransferError(td.text.strip())
for a in self.document.getiterator('a'):
m = re.match('/NSFR\?Action=VIRDA&stp=(\d+)', a.attrib['href'])
if m:
self.browser.location('/NS_VIRDA?stp=%s' % m.group(1))
return
class TransferCompletePage(BasePage):
def get_id(self):
return self.group_dict['id']