Wells Fargo module (www.wellsfargo.com).

Signed-off-by: Oleg Plakhotniuk <olegus8@gmail.com>
This commit is contained in:
Oleg Plakhotniuk 2014-08-29 13:01:33 -05:00 committed by Romain Bignon
commit 8cd95ea49b
6 changed files with 1120 additions and 0 deletions

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .backend import WellsFargoBackend
__all__ = ['WellsFargoBackend']

View file

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import CapBank
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.tools.value import ValueBackendPassword
from .browser import WellsFargo
__all__ = ['WellsFargoBackend']
class WellsFargoBackend(BaseBackend, CapBank):
NAME = 'wellsfargo'
MAINTAINER = u'Oleg Plakhotniuk'
EMAIL = 'olegus8@gmail.com'
VERSION = '0.j'
LICENSE = 'AGPLv3+'
DESCRIPTION = u'Wells Fargo'
CONFIG = BackendConfig(ValueBackendPassword('login', label='Username', masked=False),
ValueBackendPassword('password', label='Password'))
BROWSER = WellsFargo
def create_default_browser(self):
return self.create_browser(self.config['login'].get(),
self.config['password'].get())
def iter_accounts(self):
with self.browser:
return self.browser.get_accounts()
def get_account(self, id_):
with self.browser:
return self.browser.get_account(id_)
def iter_history(self, account):
with self.browser:
return self.browser.iter_history(account)

View file

@ -0,0 +1,179 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import AccountNotFound
from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword
from .pages import LoginPage, LoginRedirectPage, LoggedInPage, SummaryPage, \
DynamicPage, DynamicParser
from time import sleep
from mechanize import ItemNotFoundError
__all__ = ['WellsFargo']
class WellsFargo(BaseBrowser):
DOMAIN = 'online.wellsfargo.com'
PROTOCOL = 'https'
CERTHASH = ['04ee8bb37799ee3d15174c767bb453f5'
'7b17735fdfafd38cbea0b78979bdacd9']
ENCODING = 'UTF-8'
PAGES = {
'/$': LoginPage,
'/das/cgi-bin/session.cgi\?screenid=SIGNON$': LoginRedirectPage,
'/das/cgi-bin/session.cgi\?screenid=SIGNON_PORTAL_PAUSE$':
LoggedInPage,
'/das/cgi-bin/session.cgi\?screenid=SIGNON&LOB=CONS$':
LoggedInPage,
'/login\?ERROR_CODE=.*LOB=CONS&$': LoggedInPage,
'/das/channel/accountSummary$': SummaryPage,
'/das/cgi-bin/session.cgi\?sessargs=.+$':
(DynamicPage, DynamicParser()),
'/das/channel/accountActivityDDA\?action=doSetPage&page=.*$':
DynamicPage
}
def __init__(self, *args, **kwargs):
self._pause = 1
BaseBrowser.__init__(self, *args, **kwargs)
def home(self):
self.location('/das/channel/accountSummary')
def is_logged(self):
try:
return self.page.is_logged()
except AttributeError:
return False
def login(self):
assert isinstance(self.username, basestring)
assert isinstance(self.password, basestring)
if not self.is_on_page(LoginPage):
self.location('/', no_login=True)
assert self.is_on_page(LoginPage)
self.page.login(self.username, self.password)
if not self.is_on_page(LoginRedirectPage):
raise BrowserIncorrectPassword()
# Sometimes Wells Fargo server returns "Session time out" error
# right after login if we don't make a pause here.
sleep(self._pause)
self._pause = min(30, self._pause*2)
self.page.redirect()
self._pause = 1
def get_account(self, id_):
self.to_activity()
if id_ not in self.page.sub_page().accounts_ids():
raise AccountNotFound()
else:
self.to_activity(id_)
return self.page.sub_page().get_account()
def get_accounts(self):
self.to_activity()
for id_ in self.page.sub_page().accounts_ids():
self.to_activity(id_)
yield self.page.sub_page().get_account()
def to_summary(self):
if not self.is_on_page(SummaryPage):
self.location('/das/channel/accountSummary')
assert self.is_on_page(SummaryPage)
def is_activity(self):
try:
return self.page.sub_page().is_activity()
except AttributeError:
return False
def to_activity(self, id_=None):
if not self.is_activity():
self.to_summary()
self.page.to_activity()
assert self.is_activity()
if id_ and self.page.sub_page().account_id() != id_:
self.page.sub_page().to_account(id_)
assert self.is_activity()
assert self.page.sub_page().account_id() == id_
def is_statements(self):
try:
return self.page.sub_page().is_statements()
except AttributeError:
return False
def to_statements(self, id_=None, year=None):
if not self.is_statements():
self.to_summary()
self.page.to_statements()
assert self.is_statements()
if id_ and self.page.sub_page().account_id() != id_:
self.page.sub_page().to_account(id_)
assert self.is_statements()
assert self.page.sub_page().account_id() == id_
if year and self.page.sub_page().year() != year:
self.page.sub_page().to_year(year)
assert self.is_statements()
assert self.page.sub_page().year() == year
def is_statement(self):
try:
return self.page.sub_page().is_statement()
except AttributeError:
return False
def to_statement(self, uri):
self.location(uri)
assert self.is_statement()
def iter_history(self, account):
self.to_activity(account.id)
try:
self.page.sub_page().since_last_statement()
except ItemNotFoundError:
# Skip transactions on web page if we cannot apply
# "since last statement" filter.
# This might be the case, for example, if Wells Fargo
# is processing the current statement:
# "Since your credit card account statement is being processed,
# transactions grouped by statement period will not be available
# for up to seven days."
# (www.wellsfargo.com, 2014-07-20)
pass
else:
assert self.page.sub_page().account_id() == account.id
while True:
for trans in self.page.sub_page().iter_transactions():
yield trans
if not self.page.sub_page().next_():
break
self.to_statements(account.id)
for year in self.page.sub_page().years():
self.to_statements(account.id, year)
for stmt in self.page.sub_page().statements():
self.to_statement(stmt)
for trans in self.page.sub_page().iter_transactions():
yield trans

426
modules/wellsfargo/pages.py Normal file
View file

@ -0,0 +1,426 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import Account, Transaction
from weboob.tools.browser import BasePage
from weboob.tools.parsers import get_parser
from weboob.tools.parsers.iparser import IParser
from mechanize import ControlNotFoundError
from urllib import unquote
from StringIO import StringIO
from .parsers import StatementParser, clean_amount, clean_label
import itertools
import re
import datetime
__all__ = ['LoginPage', 'LoggedInPage', 'SummaryPage']
def form_with_control(control_name):
"""
Form search helper.
Returns whether the form has a control with specified name.
"""
def predicate(form):
try:
form.find_control(name=control_name)
except ControlNotFoundError:
return False
else:
return True
return predicate
class LoginPage(BasePage):
def login(self, login, password):
self.browser.select_form(name='Signon')
self.browser['userid'] = login.encode(self.browser.ENCODING)
self.browser['password'] = password.encode(self.browser.ENCODING)
self.browser.submit(nologin=True)
class LoginRedirectPage(BasePage):
def is_logged(self):
return True
def redirect(self):
refresh = self.document.xpath(
'//meta[@http-equiv="Refresh"]/@content')[0]
url = re.match(r'^.*URL=(.*)$', refresh).group(1)
self.browser.location(url)
class LoggedInPage(BasePage):
def is_logged(self):
if type(self.document) is str:
return True
else:
return bool(self.document.xpath(u'//a[text()="Sign Off"]')) \
or bool(self.document.xpath(u'//title[text()="Splash Page"]'))
class SummaryPage(LoggedInPage):
def to_activity(self):
href = self.document.xpath(u'//a[text()="Account Activity"]/@href')[0]
self.browser.location(href)
def to_statements(self):
href = self.document.xpath('//a[text()="Statements & Documents"]'
'/@href')[0]
self.browser.location(href)
class DynamicPage(LoggedInPage):
"""
Most of Wells Fargo pages have the same URI pattern.
Some of these pages are HTML, some are PDF.
"""
def sub_page(self):
page = None
if type(self.document) is str:
page = StatementSubPage
elif u'Account Activity' in self._title():
name = self._account_name()
if u'CHECKING' in name or u'SAVINGS' in name:
page = ActivityCashSubPage
elif u'CARD' in name:
page = ActivityCardSubPage
elif u'Statements & Documents' in self._title():
page = StatementsSubPage
assert page
return page(self)
def _title(self):
return self.document.xpath(u'//title/text()')[0]
def _account_name(self):
return self.document.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[@selected="selected"]/text()')[0]
class SubPage(object):
def __init__(self, page):
self.page = page
class AccountSubPage(SubPage):
def account_id(self, name=None):
if name:
return name[-4:] # Last 4 digits of "BLAH XXXXXXX1234"
else:
return self.account_id(self.account_name())
class ActivitySubPage(AccountSubPage):
def __init__(self, *args, **kwargs):
AccountSubPage.__init__(self, *args, **kwargs)
# As of 2014-07-03, there are few nested "optgroup" nodes on
# the account activity pages, which is a violation of HTML
# standard and cannot be parsed by mechanize's Browser.select_form.
resp = self.page.browser.response()
resp.set_data(re.sub('</?optgroup[^>]*>', '', resp.get_data()))
self.page.browser.set_response(resp)
def is_activity(self):
return True
def accounts_names(self):
return self.page.document.xpath(
u'//select[@name="selectedAccountUID"]/option/text()')
def accounts_ids(self):
return [self.account_id(name) for name in self.accounts_names()]
def account_uid(self, id_=None):
if id_:
return self.page.document.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[contains(text(),"%s")]/@value' % id_)[0]
else:
return self.page.document.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[@selected="selected"]/@value')[0]
def account_name(self):
return self.page.document.xpath(
u'//select[@name="selectedAccountUID"]'
u'/option[@selected="selected"]/text()')[0]
def account_type(self, name=None):
raise NotImplementedError()
def account_balance(self):
raise NotImplementedError()
def to_account(self, id_):
self.page.browser.select_form(name='AccountActivityForm')
self.page.browser['selectedAccountUID'] = [self.account_uid(id_)]
self.page.browser.submit()
def get_account(self):
name = self.account_name()
balance = self.account_balance()
currency = Account.get_currency(balance)
id_ = self.account_id()
type_ = self.account_type()
account = Account()
account.id = id_
account.label = name
account.currency = currency
account.balance = clean_amount(balance)
account.type = type_
return account
def since_last_statement(self):
raise NotImplementedError()
def iter_transactions(self):
raise NotImplementedError()
def next_(self):
raise NotImplementedError()
class ActivityCashSubPage(ActivitySubPage):
def account_type(self, name=None):
name = name or self.account_name()
if u'CHECKING' in name:
return Account.TYPE_CHECKING
elif u'SAVINGS' in name:
return Account.TYPE_SAVINGS
else:
return Account.TYPE_UNKNOWN
def account_balance(self):
return self.page.document.xpath(
u'//td[@headers="currentPostedBalance"]/span/text()')[0]
def since_last_statement(self):
b = self.page.browser
b.select_form(predicate=form_with_control(
'showTabDDACommand.transactionTypeFilterValue'))
b['showTabDDACommand.transactionTypeFilterValue'] = [
u'All Transactions']
b['showTabDDACommand.timeFilterValue'] = ['8']
b.submit()
def iter_transactions(self):
for row in self.page.document.xpath('//tr/th[@headers='
'"postedHeader dateHeader"]/..'):
date = row.xpath('th[@headers="postedHeader '
'dateHeader"]/text()')[0]
desc = row.xpath('td[@headers="postedHeader '
'descriptionHeader"]/div/text()')[0]
deposit = row.xpath('td[@headers="postedHeader '
'depositsConsumerHeader"]/span/text()')[0]
withdraw = row.xpath('td[@headers="postedHeader '
'withdrawalsConsumerHeader"]/span/text()')[0]
date = datetime.datetime.strptime(date, '%m/%d/%y')
desc = clean_label(desc)
deposit = deposit.strip()
deposit = clean_amount(deposit or '0')
withdraw = withdraw.strip()
withdraw = clean_amount(withdraw or '0')
amount = deposit - withdraw
trans = Transaction(u'')
trans.date = date
trans.rdate = date
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = amount
yield trans
def next_(self):
links = self.page.document.xpath('//a[@title="Go To Next Page"]/@href')
if links:
self.page.browser.location(links[0])
return True
else:
return False
class ActivityCardSubPage(ActivitySubPage):
def account_type(self, name=None):
return Account.TYPE_CARD
def account_balance(self):
return self.page.document.xpath(
u'//td[@headers="outstandingBalance"]/text()')[0]
def get_account(self):
account = ActivitySubPage.get_account(self)
# Credit card is essentially a liability.
# Negative amount means there's a payment due.
account.balance = -account.balance
return account
def since_last_statement(self):
b = self.page.browser
b.select_form(predicate=form_with_control(
'showTabCommand.transactionTypeFilterValue'))
b['showTabCommand.transactionTypeFilterValue'] = ['sincelastStmt']
b.submit()
def iter_transactions(self):
for row in self.page.document.xpath('//tr/th[@headers='
'"postedHeader transactionDateHeader"]/..'):
tdate = row.xpath('th[@headers="postedHeader '
'transactionDateHeader"]/text()')[0]
pdate = row.xpath('td[@headers="postedHeader '
'postingDateHeader"]/text()')[0]
desc = row.xpath('td[@headers="postedHeader '
'descriptionHeader"]/span/text()')[0]
ref = row.xpath('td[@headers="postedHeader '
'descriptionHeader"]/text()')[0]
amount = row.xpath('td[@headers="postedHeader '
'amountHeader"]/text()')[0]
tdate = datetime.datetime.strptime(tdate, '%m/%d/%y')
pdate = datetime.datetime.strptime(pdate, '%m/%d/%y')
desc = clean_label(desc)
ref = re.match('.*<REFERENCE ([^>]+)>.*', ref).group(1)
if amount.startswith('+'):
amount = clean_amount(amount[1:])
else:
amount = -clean_amount(amount)
trans = Transaction(ref)
trans.date = tdate
trans.rdate = pdate
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = amount
yield trans
def next_(self):
# As of 2014-07-05, there's only one page for cards history.
return False
class StatementsSubPage(AccountSubPage):
def __init__(self, *args, **kwargs):
AccountSubPage.__init__(self, *args, **kwargs)
# As of 2014-07-06, there are few "<br/>" nodes on
# the account statements pages, which is a violation of HTML
# standard and cannot be parsed by mechanize's Browser.select_form.
resp = self.page.browser.response()
resp.set_data(re.sub('<br */>', '', resp.get_data()))
self.page.browser.set_response(resp)
def is_statements(self):
return True
def account_name(self):
return self.page.document.xpath(
u'//select[@name="selectedAccountKey"]'
u'/option[@selected="selected"]/text()')[0]
def account_uid(self, id_):
return self.page.document.xpath(
u'//select[@name="selectedAccountKey"]'
u'/option[contains(text(),"%s")]/@value' % id_)[0]
def to_account(self, id_):
self.page.browser.select_form(predicate=form_with_control(
'selectedAccountKey'))
self.page.browser['selectedAccountKey'] = [self.account_uid(id_)]
self.page.browser.submit()
def year(self):
for text in self.page.document.xpath('//h2/strong/text()'):
try:
return int(text)
except ValueError:
pass
def years(self):
for text in self.page.document.xpath('//h2//strong/text()'):
try:
yield int(text)
except ValueError:
pass
def to_year(self, year):
href = self.page.document.xpath('//h2/a/strong[text()="%s"]'
'/../@href' % year)[0]
self.page.browser.location(href)
def statements(self):
for outer_uri in self.page.document.xpath(
'//table[@id="listOfStatements"]'
'//a[contains(text(), "Statement")]/@href'):
inner_uri = re.match('.*destinationClickUrl=([^&]+)&.*',
outer_uri).group(1)
yield unquote(inner_uri)
class StatementSubPage(SubPage):
def __init__(self, *args, **kwArgs):
SubPage.__init__(self, *args, **kwArgs)
self._parser = StatementParser(self.page.document)
def is_statement(self):
return True
def iter_transactions(self):
# Maintain a nice consistent newer-to-older order of transactions.
return sorted(
itertools.chain(
self._parser.read_cash_transactions(),
self._parser.read_card_transactions()),
cmp=lambda t1, t2: cmp(t2.date, t1.date) or
cmp(t1.label, t2.label) or
cmp(t1.amount, t2.amount))
class DynamicParser(IParser):
def __init__(self):
self._html = get_parser()()
self._raw = get_parser('raw')()
self._parser = None
def parse(self, data, encoding=None):
# Ugly hack to figure out the document type
s = data.read()
if s[:4] == '%PDF':
self._parser = self._raw
else:
self._parser = self._html
return self._parser.parse(StringIO(s), encoding)
def __getattr__(self, name):
assert self._parser
return getattr(self._parser, name)

View file

@ -0,0 +1,401 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import Transaction
from weboob.tools.capabilities.bank.transactions import AmericanTransaction
from decimal import Decimal
from tempfile import mkstemp
import subprocess
import os
import re
import datetime
__all__ = ['StatementParser', 'clean_amount', 'clean_label']
def clean_amount(text):
return Decimal(AmericanTransaction.clean_amount(text))
def clean_label(text):
"""
Web view and statements use different label formatting.
User shouldn't be able to see the difference, so we
need to make labels from both sources look the same.
"""
return re.sub(u' +', u' ', text.strip().upper(), re.UNICODE)
def full_date(date, date_from, date_to):
"""
Makes sure that date is close to the given range.
Transactions dates in a statement contain only day and month.
Statement dates range have a year though.
Merge them all together to get a full transaction date.
"""
dates = [datetime.datetime(d.year, date.month, date.day)
for d in (date_from, date_to)]
# Ideally, pick the date within given range.
for d in dates:
if date_from <= d <= date_to:
return d
# Otherwise, return the most recent date in the past
return min(dates, key=lambda d: abs(d-date_from))
def decompress_pdf(inpdf):
inh, inname = mkstemp(suffix='.pdf')
outh, outname = mkstemp(suffix='.pdf')
os.write(inh, inpdf)
os.close(inh)
os.close(outh)
# mutool is a part of MuPDF (http://www.mupdf.com).
subprocess.call(['mutool', 'clean', '-d', inname, outname])
with open(outname) as f:
outpdf = f.read()
os.remove(inname)
os.remove(outname)
return outpdf
class StatementParser(object):
"""
Each "read_*" method which takes position as its argument,
returns next token position if read was successful,
and the same position if it was not.
"""
def __init__(self, pdf):
self._pdf = decompress_pdf(pdf)
self._tok = StatementTokenizer(self._pdf)
def read_card_transactions(self):
# Early check if this is a card account statement at all.
if '[(Transactions)] TJ' not in self._pdf:
return
# Read statement dates range.
date_from, date_to = self.read_first_date_range()
# Read transactions.
pos = 0
while not self._tok.tok(pos).is_eof():
pos, trans = self.read_card_transaction(pos, date_from, date_to)
if trans:
yield trans
else:
pos += 1
def read_cash_transactions(self):
# Early check if this is a cash account statement at all.
if '[(Transaction history)] TJ' not in self._pdf:
return
# Read statement dates range.
date_from, date_to = self.read_first_date_range()
# Read transactions.
pos = 0
while not self._tok.tok(pos).is_eof():
pos, trans = self.read_cash_transaction(pos, date_from, date_to)
if trans:
yield trans
else:
pos += 1
def read_first_date_range(self):
pos = 0
while not self._tok.tok(pos).is_eof():
pos, date_range = self.read_date_range(pos)
if date_range is not None:
return date_range
else:
pos += 1
def read_card_transaction(self, pos, date_from, date_to):
INDENT_CHARGES = 520
startPos = pos
pos, tdate = self.read_date(pos)
pos, pdate_layout = self.read_layout_tm(pos)
pos, pdate = self.read_date(pos)
pos, ref_layout = self.read_layout_tm(pos)
pos, ref = self.read_ref(pos)
pos, desc = self.read_multiline_desc(pos)
pos, amount = self.read_indent_amount(
pos,
range_minus = (INDENT_CHARGES, 9999),
range_plus = (0, INDENT_CHARGES))
if tdate is None or pdate_layout is None or pdate is None \
or ref_layout is None or ref is None or desc is None or amount is None:
return startPos, None
else:
tdate = full_date(tdate, date_from, date_to)
pdate = full_date(pdate, date_from, date_to)
trans = Transaction(ref)
trans.date = tdate
trans.rdate = pdate
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = amount
return pos, trans
def read_cash_transaction(self, pos, date_from, date_to):
INDENT_BALANCE = 520
INDENT_WITHDRAWAL = 470
startPos = pos
pos, date = self.read_date(pos)
pos, _ = self.read_star(pos)
pos, desc = self.read_multiline_desc(pos)
pos, amount = self.read_indent_amount(
pos,
range_plus = (0, INDENT_WITHDRAWAL),
range_minus = (INDENT_WITHDRAWAL, INDENT_BALANCE),
range_skip = (INDENT_BALANCE, 9999))
if desc is None or date is None or amount is None:
return startPos, None
else:
date = full_date(date, date_from, date_to)
trans = Transaction(u'')
trans.date = date
trans.rdate = date
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = amount
return pos, trans
def read_multiline_desc(self, pos):
startPos = pos
descs = []
while True:
prevPos = pos
pos, layout = self.read_layout_tm(pos)
pos, desc = self.read_text(pos)
if layout is None or desc is None:
pos = prevPos
break
else:
descs.append(desc)
if descs:
return pos, clean_label(' '.join(descs))
else:
return startPos, None
def read_indent_amount(self, pos, range_skip=(0,0), range_plus=(0,0),
range_minus=(0,0)):
startPos = pos
# Read layout-amount pairs.
amounts = []
while True:
prevPos = pos
pos, layout = self.read_layout_tm(pos)
pos, amount = self.read_amount(pos)
if layout is None or amount is None:
pos = prevPos
break
else:
amounts.append((layout, amount))
if not amounts:
return startPos, None
else:
# Infer amount type by its indentation in the layout.
amount_total = clean_amount('0')
for (_, _, _, _, indent, _), amount in amounts:
within = lambda (xmin, xmax): xmin <= indent <= xmax
if within(range_skip):
continue
elif within(range_plus):
amount_total += amount
elif within(range_minus):
amount_total -= amount
return pos, amount_total
def read_star(self, pos):
pos1, star1 = self.read_star_1(pos)
pos2, star2 = self.read_star_2(pos)
if star1 is not None:
return pos1, star1
else:
return pos2, star2
def read_star_1(self, pos):
startPos = pos
vals = list()
pos, v = self.read_layout_tz(pos); vals.append(v)
pos, v = self.read_layout_tc(pos); vals.append(v)
pos, v = self.read_layout_tw(pos); vals.append(v)
pos, v = self.read_layout_tf(pos); vals.append(v)
pos, v = self.read_layout_tm(pos); vals.append(v)
pos, star = self.read_text(pos)
pos, v = self.read_layout_tz(pos); vals.append(v)
pos, v = self.read_layout_tc(pos); vals.append(v)
pos, v = self.read_layout_tw(pos); vals.append(v)
pos, v = self.read_layout_tf(pos); vals.append(v)
if star == 'S' and None not in vals:
return pos, star
else:
return startPos, None
def read_star_2(self, pos):
startPos = pos
vals = list()
pos, v = self.read_layout_tf(pos); vals.append(v)
pos, v = self.read_layout_tm(pos); vals.append(v)
pos, star = self.read_text(pos)
pos, v = self.read_layout_tf(pos); vals.append(v)
if star == 'S' and None not in vals:
return pos, star
else:
return startPos, None
def read_date(self, pos):
t = self._tok.tok(pos)
return (pos+1, datetime.datetime.strptime(t.value(), '%m/%d')) \
if t.is_date() else (pos, None)
def read_text(self, pos):
t = self._tok.tok(pos)
#TODO: handle PDF encodings properly.
return (pos+1, unicode(t.value(), errors='ignore')) \
if t.is_text() else (pos, None)
def read_amount(self, pos):
t = self._tok.tok(pos)
return (pos+1, clean_amount(t.value())) \
if t.is_amount() else (pos, None)
def read_date_range(self, pos):
t = self._tok.tok(pos)
if t.is_date_range_1():
return (pos+1, [datetime.datetime.strptime(v, '%B %d, %Y')
for v in t.value()])
elif t.is_date_range_2():
return (pos+1, [datetime.datetime.strptime(v, '%m/%d/%Y')
for v in t.value()])
else:
return (pos, None)
def read_ref(self, pos):
t = self._tok.tok(pos)
return (pos+1, t.value()) if t.is_ref() else (pos, None)
def read_layout_tz(self, pos):
t = self._tok.tok(pos)
return (pos+1, t.value()) if t.is_layout_tz() else (pos, None)
def read_layout_tc(self, pos):
t = self._tok.tok(pos)
return (pos+1, t.value()) if t.is_layout_tc() else (pos, None)
def read_layout_tw(self, pos):
t = self._tok.tok(pos)
return (pos+1, t.value()) if t.is_layout_tw() else (pos, None)
def read_layout_tf(self, pos):
t = self._tok.tok(pos)
return (pos+1, t.value()) if t.is_layout_tf() else (pos, None)
def read_layout_tm(self, pos):
t = self._tok.tok(pos)
return (pos+1, [float(v) for v in t.value()]) \
if t.is_layout_tm() else (pos, None)
class StatementTokenizer(object):
def __init__(self, pdf):
self._tok = [StatementToken(line) for line in pdf.split('\n')]
def tok(self, index):
if 0 <= index < len(self._tok):
return self._tok[index]
else:
return StatementToken(eof=True)
class StatementToken(object):
"""
Simple regex-based lexer.
There's a lexing table consisting of type-regex tuples.
Text line is sequentially matched against regexes and first
successful match defines the type of the token.
"""
LEX = [
('amount', r'^\[\(([0-9,]+\.\d+)\)\] TJ$'),
('date', r'^\[\((\d+/\d+)\)\] TJ$'),
('date_range_1', r'^\[\(([A-z]+ \d+, \d{4})'
r' - ([A-z]+ \d+, \d{4})\)\] TJ$'),
('date_range_2', r'^\[\((\d{2}/\d{2}/\d{4})'
r' to (\d{2}/\d{2}/\d{4})\)\] TJ$'),
('layout_tz', r'^(\d+\.\d{2}) Tz$'),
('layout_tc', r'^(\d+\.\d{2}) Tc$'),
('layout_tw', r'^(\d+\.\d{2}) Tw$'),
('layout_tf', r'^/F(\d) (\d+\.\d{2}) Tf$'),
('layout_tm', r'^' + (r'(\d+\.\d+ )'*6) + r'Tm$'),
('ref', r'^\[\(([0-9A-Z]{17})\)\] TJ$'),
('text', r'^\[\(([^\)]+)\)\] TJ$')
]
def __init__(self, line=None, eof=False):
self._eof = eof
self._value = None
self._type = None
if line is not None:
for type_, regex in self.LEX:
m = re.match(regex, line, flags=re.UNICODE)
if m:
self._type = type_
if len(m.groups()) == 1:
self._value = m.groups()[0]
elif m.groups():
self._value = m.groups()
else:
self._value = m.group(0)
break
def is_eof(self):
return self._eof
def value(self):
return self._value
for type_, _ in StatementToken.LEX:
setattr(StatementToken, 'is_%s' % type_,
eval('lambda self: self._type == "%s"' % type_))

View file

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.test import BackendTest
from itertools import chain
class WellsFargoTest(BackendTest):
BACKEND = 'wellsfargo'
def test_history(self):
"""
Test that there's at least one transaction in the whole history.
"""
b = self.backend
ts = chain(*[b.iter_history(a) for a in b.iter_accounts()])
t = next(ts, None)
self.assertNotEqual(t, None)