Rewrite Wells Fargo with browser2 (closes #1624)

Signed-off-by: Oleg Plakhotniuk <olegus8@gmail.com>
This commit is contained in:
Oleg Plakhotniuk 2014-09-19 22:26:15 -05:00 committed by Romain Bignon
commit 8f2b3acce6
3 changed files with 142 additions and 230 deletions

View file

@ -44,13 +44,10 @@ class WellsFargoBackend(BaseBackend, CapBank):
self.config['password'].get())
def iter_accounts(self):
with self.browser:
return self.browser.get_accounts()
return self.browser.get_accounts()
def get_account(self, id_):
with self.browser:
return self.browser.get_account(id_)
return self.browser.get_account(id_)
def iter_history(self, account):
with self.browser:
return self.browser.iter_history(account)
return self.browser.iter_history(account)

View file

@ -21,60 +21,36 @@
from time import sleep
from weboob.capabilities.bank import AccountNotFound
from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword
from weboob.tools.mech import ClientForm
from weboob.tools.browser2 import LoginBrowser, URL, need_login
from weboob.tools.exceptions import BrowserIncorrectPassword
from .pages import LoginPage, LoginRedirectPage, LoggedInPage, SummaryPage, \
DynamicPage, DynamicParser
DynamicPage
__all__ = ['WellsFargo']
class WellsFargo(BaseBrowser):
DOMAIN = 'online.wellsfargo.com'
PROTOCOL = 'https'
CERTHASH = ['04ee8bb37799ee3d15174c767bb453f5'
'7b17735fdfafd38cbea0b78979bdacd9']
ENCODING = 'UTF-8'
PAGES = {
'/$': LoginPage,
'/das/cgi-bin/session.cgi\?screenid=SIGNON$': LoginRedirectPage,
'/das/cgi-bin/session.cgi\?screenid=SIGNON_PORTAL_PAUSE$':
LoggedInPage,
'/das/cgi-bin/session.cgi\?screenid=SIGNON&LOB=CONS$':
LoggedInPage,
'/login\?ERROR_CODE=.*LOB=CONS&$': LoggedInPage,
'/das/channel/accountSummary$': SummaryPage,
'/das/cgi-bin/session.cgi\?sessargs=.+$':
(DynamicPage, DynamicParser()),
'/das/channel/accountActivityDDA\?action=doSetPage&page=.*$':
DynamicPage
}
class WellsFargo(LoginBrowser):
BASEURL = 'https://online.wellsfargo.com'
login = URL('/$', LoginPage)
loginRedirect = URL('/das/cgi-bin/session.cgi\?screenid=SIGNON$',
LoginRedirectPage)
loggedIn = URL('/das/cgi-bin/session.cgi\?screenid=SIGNON_PORTAL_PAUSE$',
'/das/cgi-bin/session.cgi\?screenid=SIGNON&LOB=CONS$',
'/login\?ERROR_CODE=.*LOB=CONS&$',
LoggedInPage)
summary = URL('/das/channel/accountSummary$', SummaryPage)
dynamic = URL('/das/cgi-bin/session.cgi\?sessargs=.+$',
'/das/channel/accountActivityDDA\?action=doSetPage&page=.*$',
DynamicPage)
def __init__(self, *args, **kwargs):
self._pause = 1
BaseBrowser.__init__(self, *args, **kwargs)
def home(self):
self.location('/das/channel/accountSummary')
def is_logged(self):
try:
return self.page.is_logged()
except AttributeError:
return False
def login(self):
assert isinstance(self.username, basestring)
assert isinstance(self.password, basestring)
if not self.is_on_page(LoginPage):
self.location('/', no_login=True)
assert self.is_on_page(LoginPage)
_pause = 1
def do_login(self):
self.login.go()
self.page.login(self.username, self.password)
if not self.is_on_page(LoginRedirectPage):
if not self.loginRedirect.is_here():
raise BrowserIncorrectPassword()
# Sometimes Wells Fargo server returns "Session time out" error
@ -86,96 +62,95 @@ class WellsFargo(BaseBrowser):
def get_account(self, id_):
self.to_activity()
if id_ not in self.page.sub_page().accounts_ids():
if id_ not in self.page.subpage.accounts_ids():
raise AccountNotFound()
else:
self.to_activity(id_)
return self.page.sub_page().get_account()
return self.page.subpage.get_account()
def get_accounts(self):
self.to_activity()
for id_ in self.page.sub_page().accounts_ids():
for id_ in self.page.subpage.accounts_ids():
self.to_activity(id_)
yield self.page.sub_page().get_account()
yield self.page.subpage.get_account()
@need_login
def to_summary(self):
if not self.is_on_page(SummaryPage):
self.location('/das/channel/accountSummary')
assert self.is_on_page(SummaryPage)
self.summary.stay_or_go()
assert self.summary.is_here()
def is_activity(self):
try:
return self.page.sub_page().is_activity()
return self.page.subpage.is_activity()
except AttributeError:
return False
@need_login
def to_activity(self, id_=None):
if not self.is_activity():
self.to_summary()
self.page.to_activity()
assert self.is_activity()
if id_ and self.page.sub_page().account_id() != id_:
self.page.sub_page().to_account(id_)
if id_ and self.page.subpage.account_id() != id_:
self.page.subpage.to_account(id_)
assert self.is_activity()
assert self.page.sub_page().account_id() == id_
assert self.page.subpage.account_id() == id_
def is_statements(self):
try:
return self.page.sub_page().is_statements()
return self.page.subpage.is_statements()
except AttributeError:
return False
@need_login
def to_statements(self, id_=None, year=None):
if not self.is_statements():
self.to_summary()
self.page.to_statements()
assert self.is_statements()
if id_ and self.page.sub_page().account_id() != id_:
self.page.sub_page().to_account(id_)
if id_ and self.page.subpage.account_id() != id_:
self.page.subpage.to_account(id_)
assert self.is_statements()
assert self.page.sub_page().account_id() == id_
if year and self.page.sub_page().year() != year:
self.page.sub_page().to_year(year)
assert self.page.subpage.account_id() == id_
if year and self.page.subpage.year() != year:
self.page.subpage.to_year(year)
assert self.is_statements()
assert self.page.sub_page().year() == year
assert self.page.subpage.year() == year
def is_statement(self):
try:
return self.page.sub_page().is_statement()
return self.page.subpage.is_statement()
except AttributeError:
return False
@need_login
def to_statement(self, uri):
self.location(uri)
assert self.is_statement()
def iter_history(self, account):
self.to_activity(account.id)
try:
self.page.sub_page().since_last_statement()
except ClientForm.ItemNotFoundError:
# Skip transactions on web page if we cannot apply
# "since last statement" filter.
# This might be the case, for example, if Wells Fargo
# is processing the current statement:
# "Since your credit card account statement is being processed,
# transactions grouped by statement period will not be available
# for up to seven days."
# (www.wellsfargo.com, 2014-07-20)
pass
else:
assert self.page.sub_page().account_id() == account.id
# Skip transactions on web page if we cannot apply
# "since last statement" filter.
# This might be the case, for example, if Wells Fargo
# is processing the current statement:
# "Since your credit card account statement is being processed,
# transactions grouped by statement period will not be available
# for up to seven days."
# (www.wellsfargo.com, 2014-07-20)
if self.page.subpage.since_last_statement():
assert self.page.subpage.account_id() == account.id
while True:
for trans in self.page.sub_page().iter_transactions():
for trans in self.page.subpage.iter_transactions():
yield trans
if not self.page.sub_page().next_():
if not self.page.subpage.next_():
break
self.to_statements(account.id)
for year in self.page.sub_page().years():
for year in self.page.subpage.years():
self.to_statements(account.id, year)
for stmt in self.page.sub_page().statements():
for stmt in self.page.subpage.statements():
self.to_statement(stmt)
for trans in self.page.sub_page().iter_transactions():
for trans in self.page.subpage.iter_transactions():
yield trans

View file

@ -18,107 +18,87 @@
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import Account, Transaction
from weboob.tools.browser import BasePage
from weboob.tools.parsers import get_parser
from weboob.tools.parsers.iparser import IParser
from weboob.tools.mech import ClientForm
from weboob.tools.browser2.page import BasePage, HTMLPage, LoggedPage, RawPage
from urllib import unquote
from StringIO import StringIO
from .parsers import StatementParser, clean_amount, clean_label
import itertools
import re
import datetime
def form_with_control(control_name):
"""
Form search helper.
Returns whether the form has a control with specified name.
"""
def predicate(form):
try:
form.find_control(name=control_name)
except ClientForm.ControlNotFoundError:
return False
else:
return True
return predicate
class LoginPage(BasePage):
class LoginPage(HTMLPage):
def login(self, login, password):
self.browser.select_form(name='Signon')
self.browser['userid'] = login.encode(self.browser.ENCODING)
self.browser['password'] = password.encode(self.browser.ENCODING)
self.browser.submit(nologin=True)
form = self.get_form(xpath='//form[@name="Signon"]')
form['userid'] = login
form['password'] = password
form.submit()
class LoginRedirectPage(BasePage):
def is_logged(self):
return True
class LoginRedirectPage(LoggedPage, HTMLPage):
def redirect(self):
refresh = self.document.xpath(
refresh = self.doc.xpath(
'//meta[@http-equiv="Refresh"]/@content')[0]
url = re.match(r'^.*URL=(.*)$', refresh).group(1)
self.browser.location(url)
class LoggedInPage(BasePage):
def is_logged(self):
if type(self.document) is str:
return True
else:
return bool(self.document.xpath(u'//a[text()="Sign Off"]')) \
or bool(self.document.xpath(u'//title[text()="Splash Page"]'))
class LoggedInPage(HTMLPage):
@property
def logged(self):
return bool(self.doc.xpath(u'//a[text()="Sign Off"]')) \
or bool(self.doc.xpath(u'//title[text()="Splash Page"]'))
class SummaryPage(LoggedInPage):
def to_activity(self):
href = self.document.xpath(u'//a[text()="Account Activity"]/@href')[0]
href = self.doc.xpath(u'//a[text()="Account Activity"]/@href')[0]
self.browser.location(href)
def to_statements(self):
href = self.document.xpath('//a[text()="Statements & Documents"]'
'/@href')[0]
href = self.doc.xpath('//a[text()="Statements & Documents"]'
'/@href')[0]
self.browser.location(href)
class DynamicPage(LoggedInPage):
class DynamicPage(BasePage):
"""
Most of Wells Fargo pages have the same URI pattern.
Some of these pages are HTML, some are PDF.
"""
def sub_page(self):
page = None
if type(self.document) is str:
page = StatementSubPage
def __init__(self, browser, response, *args, **kwargs):
super(DynamicPage, self).__init__(browser, response, *args, **kwargs)
# Ugly hack to figure out the page type
klass = RawPage if response.content[:4] == '%PDF' else HTMLPage
self.doc = klass(browser, response, *args, **kwargs).doc
subclass = None
# Ugly hack to figure out the page type
if response.content[:4] == '%PDF':
subclass = StatementSubPage
elif u'Account Activity' in self._title():
name = self._account_name()
if u'CHECKING' in name or u'SAVINGS' in name:
page = ActivityCashSubPage
subclass = ActivityCashSubPage
elif u'CARD' in name:
page = ActivityCardSubPage
subclass = ActivityCardSubPage
elif u'Statements & Documents' in self._title():
page = StatementsSubPage
assert page
return page(self)
subclass = StatementsSubPage
assert subclass
self.subpage = subclass(browser, response, *args, **kwargs)
@property
def logged(self):
return self.subpage.logged
def _title(self):
return self.document.xpath(u'//title/text()')[0]
return self.doc.xpath(u'//title/text()')[0]
def _account_name(self):
return self.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[@selected="selected"]/text()')[0]
class SubPage(object):
def __init__(self, page):
self.page = page
class AccountSubPage(SubPage):
class AccountSubPage(LoggedInPage):
def account_id(self, name=None):
if name:
return name[-4:] # Last 4 digits of "BLAH XXXXXXX1234"
@ -127,21 +107,11 @@ class AccountSubPage(SubPage):
class ActivitySubPage(AccountSubPage):
def __init__(self, *args, **kwargs):
AccountSubPage.__init__(self, *args, **kwargs)
# As of 2014-07-03, there are few nested "optgroup" nodes on
# the account activity pages, which is a violation of HTML
# standard and cannot be parsed by mechanize's Browser.select_form.
resp = self.page.browser.response()
resp.set_data(re.sub('</?optgroup[^>]*>', '', resp.get_data()))
self.page.browser.set_response(resp)
def is_activity(self):
return True
def accounts_names(self):
return self.page.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountUID"]/option/text()')
def accounts_ids(self):
@ -149,16 +119,16 @@ class ActivitySubPage(AccountSubPage):
def account_uid(self, id_=None):
if id_:
return self.page.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[contains(text(),"%s")]/@value' % id_)[0]
else:
return self.page.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[@selected="selected"]/@value')[0]
def account_name(self):
return self.page.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[@selected="selected"]/text()')[0]
@ -169,9 +139,9 @@ class ActivitySubPage(AccountSubPage):
raise NotImplementedError()
def to_account(self, id_):
self.page.browser.select_form(name='AccountActivityForm')
self.page.browser['selectedAccountUID'] = [self.account_uid(id_)]
self.page.browser.submit()
form = self.get_form(xpath='//form[@name="AccountActivityForm"]')
form['selectedAccountUID'] = [self.account_uid(id_)]
form.submit()
def get_account(self):
name = self.account_name()
@ -209,25 +179,24 @@ class ActivityCashSubPage(ActivitySubPage):
return Account.TYPE_UNKNOWN
def account_balance(self):
return self.page.document.xpath(
return self.doc.xpath(
u'//td[@headers="currentPostedBalance"]/span/text()')[0]
def since_last_statement(self):
b = self.page.browser
b.select_form(predicate=form_with_control(
'showTabDDACommand.transactionTypeFilterValue'))
b['showTabDDACommand.transactionTypeFilterValue'] = [
form = self.get_form(xpath='//form[@id="ddaShowForm"]')
form['showTabDDACommand.transactionTypeFilterValue'] = [
u'All Transactions']
b['showTabDDACommand.timeFilterValue'] = ['8']
b.submit()
form['showTabDDACommand.timeFilterValue'] = ['8']
form.submit()
return True
def iter_transactions(self):
for row in self.page.document.xpath('//tr/th[@headers='
'"postedHeader dateHeader"]/..'):
for row in self.doc.xpath('//tr/th[@headers='
'"postedHeader dateHeader"]/..'):
date = row.xpath('th[@headers="postedHeader '
'dateHeader"]/text()')[0]
desc = row.xpath('td[@headers="postedHeader '
'descriptionHeader"]/div/text()')[0]
'descriptionHeader"]/span/text()')[0]
deposit = row.xpath('td[@headers="postedHeader '
'depositsConsumerHeader"]/span/text()')[0]
withdraw = row.xpath('td[@headers="postedHeader '
@ -254,9 +223,9 @@ class ActivityCashSubPage(ActivitySubPage):
yield trans
def next_(self):
links = self.page.document.xpath('//a[@title="Go To Next Page"]/@href')
links = self.doc.xpath('//a[@title="Go To Next Page"]/@href')
if links:
self.page.browser.location(links[0])
self.browser.location(links[0])
return True
else:
return False
@ -267,7 +236,7 @@ class ActivityCardSubPage(ActivitySubPage):
return Account.TYPE_CARD
def account_balance(self):
return self.page.document.xpath(
return self.doc.xpath(
u'//td[@headers="outstandingBalance"]/text()')[0]
def get_account(self):
@ -280,15 +249,18 @@ class ActivityCardSubPage(ActivitySubPage):
return account
def since_last_statement(self):
b = self.page.browser
b.select_form(predicate=form_with_control(
'showTabCommand.transactionTypeFilterValue'))
b['showTabCommand.transactionTypeFilterValue'] = ['sincelastStmt']
b.submit()
if self.doc.xpath('//select[@name="showTabCommand.'
'transactionTypeFilterValue"]'
'/option[@value="sincelastStmt"]'):
form = self.get_form(xpath='//form[@id="creditCardShowForm"]')
form['showTabCommand.transactionTypeFilterValue'] = [
'sincelastStmt']
form.submit()
return True
def iter_transactions(self):
for row in self.page.document.xpath('//tr/th[@headers='
'"postedHeader transactionDateHeader"]/..'):
for row in self.doc.xpath('//tr/th[@headers='
'"postedHeader transactionDateHeader"]/..'):
tdate = row.xpath('th[@headers="postedHeader '
'transactionDateHeader"]/text()')[0]
pdate = row.xpath('td[@headers="postedHeader '
@ -327,56 +299,44 @@ class ActivityCardSubPage(ActivitySubPage):
class StatementsSubPage(AccountSubPage):
def __init__(self, *args, **kwargs):
AccountSubPage.__init__(self, *args, **kwargs)
# As of 2014-07-06, there are few "<br/>" nodes on
# the account statements pages, which is a violation of HTML
# standard and cannot be parsed by mechanize's Browser.select_form.
resp = self.page.browser.response()
resp.set_data(re.sub('<br */>', '', resp.get_data()))
self.page.browser.set_response(resp)
def is_statements(self):
return True
def account_name(self):
return self.page.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountKey"]'
u'/option[@selected="selected"]/text()')[0]
def account_uid(self, id_):
return self.page.document.xpath(
return self.doc.xpath(
u'//select[@name="selectedAccountKey"]'
u'/option[contains(text(),"%s")]/@value' % id_)[0]
def to_account(self, id_):
self.page.browser.select_form(predicate=form_with_control(
'selectedAccountKey'))
self.page.browser['selectedAccountKey'] = [self.account_uid(id_)]
self.page.browser.submit()
form = self.get_form(xpath='//form[@id="statementsAndDocumentsModel"]')
form['selectedAccountKey'] = [self.account_uid(id_)]
form.submit()
def year(self):
for text in self.page.document.xpath('//h2/strong/text()'):
for text in self.doc.xpath('//h2/strong/text()'):
try:
return int(text)
except ValueError:
pass
def years(self):
for text in self.page.document.xpath('//h2//strong/text()'):
for text in self.doc.xpath('//h2//strong/text()'):
try:
yield int(text)
except ValueError:
pass
def to_year(self, year):
href = self.page.document.xpath('//h2/a/strong[text()="%s"]'
'/../@href' % year)[0]
self.page.browser.location(href)
href = self.doc.xpath('//h2/a/strong[text()="%s"]/../@href' % year)[0]
self.browser.location(href)
def statements(self):
for outer_uri in self.page.document.xpath(
for outer_uri in self.doc.xpath(
'//table[@id="listOfStatements"]'
'//a[contains(text(), "Statement")]/@href'):
inner_uri = re.match('.*destinationClickUrl=([^&]+)&.*',
@ -384,11 +344,10 @@ class StatementsSubPage(AccountSubPage):
yield unquote(inner_uri)
class StatementSubPage(SubPage):
class StatementSubPage(LoggedPage, RawPage):
def __init__(self, *args, **kwArgs):
SubPage.__init__(self, *args, **kwArgs)
self._parser = StatementParser(self.page.document)
RawPage.__init__(self, *args, **kwArgs)
self._parser = StatementParser(self.doc)
def is_statement(self):
return True
@ -403,22 +362,3 @@ class StatementSubPage(SubPage):
cmp(t1.label, t2.label) or
cmp(t1.amount, t2.amount))
class DynamicParser(IParser):
def __init__(self):
self._html = get_parser()()
self._raw = get_parser('raw')()
self._parser = None
def parse(self, data, encoding=None):
# Ugly hack to figure out the document type
s = data.read()
if s[:4] == '%PDF':
self._parser = self._raw
else:
self._parser = self._html
return self._parser.parse(StringIO(s), encoding)
def __getattr__(self, name):
assert self._parser
return getattr(self._parser, name)