Add Messages support to bnporc
Private messages from the bank.
This commit is contained in:
parent
327cb4f618
commit
a837d954ee
7 changed files with 198 additions and 13 deletions
|
|
@ -21,7 +21,10 @@
|
||||||
# python2.5 compatibility
|
# python2.5 compatibility
|
||||||
from __future__ import with_statement
|
from __future__ import with_statement
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from weboob.capabilities.bank import ICapBank, AccountNotFound, Account, Recipient
|
from weboob.capabilities.bank import ICapBank, AccountNotFound, Account, Recipient
|
||||||
|
from weboob.capabilities.messages import ICapMessages, Thread
|
||||||
from weboob.tools.backend import BaseBackend, BackendConfig
|
from weboob.tools.backend import BaseBackend, BackendConfig
|
||||||
from weboob.tools.value import ValueBackendPassword
|
from weboob.tools.value import ValueBackendPassword
|
||||||
|
|
||||||
|
|
@ -31,7 +34,7 @@ from .browser import BNPorc
|
||||||
__all__ = ['BNPorcBackend']
|
__all__ = ['BNPorcBackend']
|
||||||
|
|
||||||
|
|
||||||
class BNPorcBackend(BaseBackend, ICapBank):
|
class BNPorcBackend(BaseBackend, ICapBank, ICapMessages):
|
||||||
NAME = 'bnporc'
|
NAME = 'bnporc'
|
||||||
MAINTAINER = 'Romain Bignon'
|
MAINTAINER = 'Romain Bignon'
|
||||||
EMAIL = 'romain@weboob.org'
|
EMAIL = 'romain@weboob.org'
|
||||||
|
|
@ -44,6 +47,15 @@ class BNPorcBackend(BaseBackend, ICapBank):
|
||||||
label='Password to set when the allowed uses are exhausted (6 digits)',
|
label='Password to set when the allowed uses are exhausted (6 digits)',
|
||||||
regexp='^(\d{6}|)$'))
|
regexp='^(\d{6}|)$'))
|
||||||
BROWSER = BNPorc
|
BROWSER = BNPorc
|
||||||
|
STORAGE = {'seen': []}
|
||||||
|
|
||||||
|
# Store the messages *list* for this duration
|
||||||
|
CACHE_THREADS = timedelta(seconds=3 * 60 * 60)
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
BaseBackend.__init__(self, *args, **kwargs)
|
||||||
|
self._threads = None
|
||||||
|
self._threads_age = datetime.utcnow()
|
||||||
|
|
||||||
def create_default_browser(self):
|
def create_default_browser(self):
|
||||||
if self.config['rotating_password'].get().isdigit() and len(self.config['rotating_password'].get()) == 6:
|
if self.config['rotating_password'].get().isdigit() and len(self.config['rotating_password'].get()) == 6:
|
||||||
|
|
@ -102,3 +114,48 @@ class BNPorcBackend(BaseBackend, ICapBank):
|
||||||
|
|
||||||
with self.browser:
|
with self.browser:
|
||||||
return self.browser.transfer(account, to, amount, reason)
|
return self.browser.transfer(account, to, amount, reason)
|
||||||
|
|
||||||
|
def iter_threads(self, cache=False):
|
||||||
|
"""
|
||||||
|
If cache is False, always fetch the threads from the website.
|
||||||
|
"""
|
||||||
|
old = self._threads_age < datetime.utcnow() - self.CACHE_THREADS
|
||||||
|
threads = self._threads
|
||||||
|
if not cache or threads is None or old:
|
||||||
|
with self.browser:
|
||||||
|
threads = list(self.browser.iter_threads())
|
||||||
|
# the website is stupid and does not have the messages in the proper order
|
||||||
|
threads = sorted(threads, key=lambda t: t.date, reverse=True)
|
||||||
|
self._threads = threads
|
||||||
|
for thread in threads:
|
||||||
|
if thread.id not in self.storage.get('seen', default=[]):
|
||||||
|
thread.root.flags |= thread.root.IS_UNREAD
|
||||||
|
yield thread
|
||||||
|
|
||||||
|
def fill_thread(self, thread, fields=None):
|
||||||
|
if fields is None or 'root' in fields:
|
||||||
|
return self.get_thread(thread)
|
||||||
|
|
||||||
|
def get_thread(self, _id):
|
||||||
|
if isinstance(_id, Thread):
|
||||||
|
thread = _id
|
||||||
|
_id = thread.id
|
||||||
|
else:
|
||||||
|
thread = Thread(_id)
|
||||||
|
with self.browser:
|
||||||
|
thread = self.browser.get_thread(thread)
|
||||||
|
return thread
|
||||||
|
|
||||||
|
def iter_unread_messages(self, thread=None):
|
||||||
|
threads = list(self.iter_threads(cache=True))
|
||||||
|
for thread in threads:
|
||||||
|
thread = self.fillobj(thread) or thread
|
||||||
|
for m in thread.iter_all_messages():
|
||||||
|
if m.flags & m.IS_UNREAD:
|
||||||
|
yield m
|
||||||
|
|
||||||
|
def set_message_read(self, message):
|
||||||
|
self.storage.get('seen', default=[]).append(message.thread.id)
|
||||||
|
self.storage.save()
|
||||||
|
|
||||||
|
OBJECTS = {Thread: fill_thread}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,8 @@ from weboob.capabilities.bank import TransferError, Transfer
|
||||||
from .pages import AccountsList, AccountHistory, ChangePasswordPage, \
|
from .pages import AccountsList, AccountHistory, ChangePasswordPage, \
|
||||||
AccountComing, AccountPrelevement, TransferPage, \
|
AccountComing, AccountPrelevement, TransferPage, \
|
||||||
TransferConfirmPage, TransferCompletePage, \
|
TransferConfirmPage, TransferCompletePage, \
|
||||||
LoginPage, ConfirmPage, MessagePage
|
LoginPage, ConfirmPage, InfoMessagePage, \
|
||||||
|
MessagePage, MessagesPage
|
||||||
from .errors import PasswordExpired
|
from .errors import PasswordExpired
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -48,7 +49,9 @@ class BNPorc(BaseBrowser):
|
||||||
'.*type=homeconnex.*': LoginPage,
|
'.*type=homeconnex.*': LoginPage,
|
||||||
'.*layout=HomeConnexion.*': ConfirmPage,
|
'.*layout=HomeConnexion.*': ConfirmPage,
|
||||||
'.*SAF_CHM_VALID.*': ConfirmPage,
|
'.*SAF_CHM_VALID.*': ConfirmPage,
|
||||||
'.*Action=DSP_MSG.*': MessagePage,
|
'.*Action=DSP_MSG.*': InfoMessagePage,
|
||||||
|
'.*MessagesRecus.*': MessagesPage,
|
||||||
|
'.*BmmFicheLireMessage.*': MessagePage,
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
|
|
@ -68,7 +71,7 @@ class BNPorc(BaseBrowser):
|
||||||
assert self.password.isdigit()
|
assert self.password.isdigit()
|
||||||
|
|
||||||
if not self.is_on_page(LoginPage):
|
if not self.is_on_page(LoginPage):
|
||||||
self.location('https://www.secure.bnpparibas.net/banque/portail/particulier/HomeConnexion?type=homeconnex')
|
self.home()
|
||||||
|
|
||||||
self.page.login(self.username, self.password)
|
self.page.login(self.username, self.password)
|
||||||
self.location('/NSFR?Action=DSP_VGLOBALE', no_login=True)
|
self.location('/NSFR?Action=DSP_VGLOBALE', no_login=True)
|
||||||
|
|
@ -93,7 +96,8 @@ class BNPorc(BaseBrowser):
|
||||||
self.page.change_password(self.password, new_password)
|
self.page.change_password(self.password, new_password)
|
||||||
|
|
||||||
if not self.is_on_page(ConfirmPage) or self.page.get_error() is not None:
|
if not self.is_on_page(ConfirmPage) or self.page.get_error() is not None:
|
||||||
self.logger.error('Oops, unable to change password (%s)' % (self.page.get_error() if self.is_on_page(ConfirmPage) else 'unknown'))
|
self.logger.error('Oops, unable to change password (%s)'
|
||||||
|
% (self.page.get_error() if self.is_on_page(ConfirmPage) else 'unknown'))
|
||||||
return
|
return
|
||||||
|
|
||||||
self.password, self.rotating_password = (new_password, self.password)
|
self.password, self.rotating_password = (new_password, self.password)
|
||||||
|
|
@ -158,7 +162,6 @@ class BNPorc(BaseBrowser):
|
||||||
# self.location('/banque/portail/particulier/FicheA#pageId=mouvementsavenir', urllib.urlencode(data))
|
# self.location('/banque/portail/particulier/FicheA#pageId=mouvementsavenir', urllib.urlencode(data))
|
||||||
# return self.page.get_operations()
|
# return self.page.get_operations()
|
||||||
|
|
||||||
|
|
||||||
def iter_history(self, id):
|
def iter_history(self, id):
|
||||||
self.location('/banque/portail/particulier/FicheA?contractId=%d&pageId=releveoperations&_eventId=changeOperationsPerPage&operationsPerPage=200' % int(id))
|
self.location('/banque/portail/particulier/FicheA?contractId=%d&pageId=releveoperations&_eventId=changeOperationsPerPage&operationsPerPage=200' % int(id))
|
||||||
return self.page.iter_operations()
|
return self.page.iter_operations()
|
||||||
|
|
@ -195,3 +198,38 @@ class BNPorc(BaseBrowser):
|
||||||
transfer.recipient = accounts[to_id].label
|
transfer.recipient = accounts[to_id].label
|
||||||
transfer.date = datetime.now()
|
transfer.date = datetime.now()
|
||||||
return transfer
|
return transfer
|
||||||
|
|
||||||
|
def messages_page(self):
|
||||||
|
if not self.is_on_page(MessagesPage):
|
||||||
|
if not self.is_on_page(AccountsList):
|
||||||
|
self.location('/NSFR?Action=DSP_VGLOBALE')
|
||||||
|
self.location(self.page.get_messages_link())
|
||||||
|
assert self.is_on_page(MessagesPage)
|
||||||
|
|
||||||
|
def iter_threads(self):
|
||||||
|
self.messages_page()
|
||||||
|
for thread in self.page.iter_threads():
|
||||||
|
yield thread
|
||||||
|
|
||||||
|
def get_thread(self, thread):
|
||||||
|
self.messages_page()
|
||||||
|
if not hasattr(thread, '_link_id') or not thread._link_id:
|
||||||
|
for t in self.iter_threads():
|
||||||
|
if t.id == thread.id:
|
||||||
|
thread = t
|
||||||
|
break
|
||||||
|
# mimic validerFormulaire() javascript
|
||||||
|
# yes, it makes no sense
|
||||||
|
page_id, unread = thread._link_id
|
||||||
|
self.select_form('listerMessages')
|
||||||
|
self.form.set_all_readonly(False)
|
||||||
|
self['identifiant'] = page_id
|
||||||
|
if len(thread.id):
|
||||||
|
self['idMessage'] = thread.id
|
||||||
|
# the JS does this, but it makes us unable to read unread messages
|
||||||
|
#if unread:
|
||||||
|
# self['newMsg'] = thread.id
|
||||||
|
self.submit()
|
||||||
|
assert self.is_on_page(MessagePage)
|
||||||
|
thread.root.content = self.page.get_content()
|
||||||
|
return thread
|
||||||
|
|
|
||||||
|
|
@ -21,10 +21,12 @@
|
||||||
from .accounts_list import AccountsList
|
from .accounts_list import AccountsList
|
||||||
from .transactions import AccountHistory, AccountComing
|
from .transactions import AccountHistory, AccountComing
|
||||||
from .transfer import TransferPage, TransferConfirmPage, TransferCompletePage
|
from .transfer import TransferPage, TransferConfirmPage, TransferCompletePage
|
||||||
from .login import LoginPage, ConfirmPage, ChangePasswordPage, MessagePage
|
from .login import LoginPage, ConfirmPage, ChangePasswordPage, InfoMessagePage
|
||||||
|
from .messages import MessagePage, MessagesPage
|
||||||
|
|
||||||
class AccountPrelevement(AccountsList): pass
|
class AccountPrelevement(AccountsList): pass
|
||||||
|
|
||||||
__all__ = ['AccountsList', 'AccountComing', 'AccountHistory', 'LoginPage',
|
__all__ = ['AccountsList', 'AccountComing', 'AccountHistory', 'LoginPage',
|
||||||
'ConfirmPage', 'MessagePage', 'AccountPrelevement', 'ChangePasswordPage',
|
'ConfirmPage', 'InfoMessagePage', 'AccountPrelevement', 'ChangePasswordPage',
|
||||||
'TransferPage', 'TransferConfirmPage', 'TransferCompletePage']
|
'TransferPage', 'TransferConfirmPage', 'TransferCompletePage',
|
||||||
|
'MessagePage', 'MessagesPage']
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
from weboob.capabilities.bank import Account
|
from weboob.capabilities.bank import Account
|
||||||
from weboob.capabilities.base import NotAvailable
|
from weboob.capabilities.base import NotAvailable
|
||||||
from weboob.tools.browser import BasePage
|
from weboob.tools.browser import BasePage, BrokenPageError
|
||||||
|
|
||||||
from ..errors import PasswordExpired
|
from ..errors import PasswordExpired
|
||||||
|
|
||||||
|
|
@ -86,3 +86,12 @@ class AccountsList(BasePage):
|
||||||
|
|
||||||
def get_execution_id(self):
|
def get_execution_id(self):
|
||||||
return self.document.xpath('//input[@name="execution"]')[0].attrib['value']
|
return self.document.xpath('//input[@name="execution"]')[0].attrib['value']
|
||||||
|
|
||||||
|
def get_messages_link(self):
|
||||||
|
"""
|
||||||
|
Get the link to the messages page, which seems to have an identifier in it.
|
||||||
|
"""
|
||||||
|
for link in self.parser.select(self.document.getroot(), 'div#pantalon div.interieur a'):
|
||||||
|
if 'MessagesRecus' in link.attrib.get('href', ''):
|
||||||
|
return link.attrib['href']
|
||||||
|
raise BrokenPageError('Unable to find the link to the messages page')
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,7 @@ class ConfirmPage(BasePage):
|
||||||
if m:
|
if m:
|
||||||
return m.group(1)
|
return m.group(1)
|
||||||
|
|
||||||
class MessagePage(BasePage):
|
class InfoMessagePage(BasePage):
|
||||||
def on_loaded(self):
|
def on_loaded(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
||||||
72
modules/bnporc/pages/messages.py
Normal file
72
modules/bnporc/pages/messages.py
Normal file
|
|
@ -0,0 +1,72 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright(C) 2012 Laurent Bachelier
|
||||||
|
#
|
||||||
|
# This file is part of weboob.
|
||||||
|
#
|
||||||
|
# weboob is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# weboob is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
|
||||||
|
from weboob.tools.browser import BasePage
|
||||||
|
from weboob.capabilities.messages import Message, Thread
|
||||||
|
from weboob.capabilities.base import NotLoaded
|
||||||
|
|
||||||
|
import re
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
__all__ = ['MessagesPage', 'MessagePage']
|
||||||
|
|
||||||
|
|
||||||
|
class MessagesPage(BasePage):
|
||||||
|
def iter_threads(self):
|
||||||
|
table = self.parser.select(self.document.getroot(), 'table#listeMessages', 1)
|
||||||
|
for tr in table.xpath('./tr'):
|
||||||
|
if tr.attrib.get('class', '') not in ('msgLu', 'msgNonLu'):
|
||||||
|
continue
|
||||||
|
author = unicode(self.parser.select(tr, 'td.colEmetteur', 1).text)
|
||||||
|
link = self.parser.select(tr, 'td.colObjet a', 1)
|
||||||
|
date_raw = self.parser.select(tr, 'td.colDate1', 1).attrib['data']
|
||||||
|
jsparams = re.search('\((.+)\)', link.attrib['onclick']).groups()[0]
|
||||||
|
jsparams = [i.strip('\'" ') for i in jsparams.split(',')]
|
||||||
|
page_id, _id, unread = jsparams
|
||||||
|
# this means unread on the website
|
||||||
|
unread = False if unread == "false" else True
|
||||||
|
# 2012/02/29:01h30min45sec
|
||||||
|
dt_match = re.match('(\d+)/(\d+)/(\d+):(\d+)h(\d+)min(\d+)sec', date_raw).groups()
|
||||||
|
dt_match = [int(d) for d in dt_match]
|
||||||
|
thread = Thread(_id)
|
||||||
|
thread._link_id = (page_id, unread)
|
||||||
|
thread.date = datetime(*dt_match)
|
||||||
|
thread.title = unicode(link.text)
|
||||||
|
message = Message(thread, 0)
|
||||||
|
message.set_empty_fields(None)
|
||||||
|
message.flags = message.IS_HTML
|
||||||
|
message.title = thread.title
|
||||||
|
message.date = thread.date
|
||||||
|
message.sender = author
|
||||||
|
message.content = NotLoaded # This is the only thing we are missing
|
||||||
|
thread.root = message
|
||||||
|
yield thread
|
||||||
|
|
||||||
|
|
||||||
|
class MessagePage(BasePage):
|
||||||
|
def get_content(self):
|
||||||
|
"""
|
||||||
|
Get the message content.
|
||||||
|
This page has a date, but it is less precise than the main list page,
|
||||||
|
so we only use it for the message content.
|
||||||
|
"""
|
||||||
|
content = self.parser.select(self.document.getroot(),
|
||||||
|
'div.txtMessage div.contenu', 1)
|
||||||
|
return self.parser.tostring(content)
|
||||||
|
|
@ -19,13 +19,20 @@
|
||||||
|
|
||||||
|
|
||||||
from weboob.tools.test import BackendTest
|
from weboob.tools.test import BackendTest
|
||||||
|
from random import choice
|
||||||
|
|
||||||
|
|
||||||
class BNPorcTest(BackendTest):
|
class BNPorcTest(BackendTest):
|
||||||
BACKEND = 'bnporc'
|
BACKEND = 'bnporc'
|
||||||
|
|
||||||
def test_bnporc(self):
|
def test_bank(self):
|
||||||
l = list(self.backend.iter_accounts())
|
l = list(self.backend.iter_accounts())
|
||||||
if len(l) > 0:
|
if len(l) > 0:
|
||||||
a = l[0]
|
a = l[0]
|
||||||
list(self.backend.iter_operations(a))
|
list(self.backend.iter_operations(a))
|
||||||
list(self.backend.iter_history(a))
|
list(self.backend.iter_history(a))
|
||||||
|
|
||||||
|
def test_msgs(self):
|
||||||
|
threads = list(self.backend.iter_threads())
|
||||||
|
thread = self.backend.fillobj(choice(threads), ['root'])
|
||||||
|
assert len(thread.root.content)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue