diff --git a/modules/wellsfargo/__init__.py b/modules/wellsfargo/__init__.py new file mode 100644 index 00000000..23152926 --- /dev/null +++ b/modules/wellsfargo/__init__.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Oleg Plakhotniuk +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + + +from .backend import WellsFargoBackend + +__all__ = ['WellsFargoBackend'] + diff --git a/modules/wellsfargo/backend.py b/modules/wellsfargo/backend.py new file mode 100644 index 00000000..c73315eb --- /dev/null +++ b/modules/wellsfargo/backend.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Oleg Plakhotniuk +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + + +from weboob.capabilities.bank import CapBank +from weboob.tools.backend import BaseBackend, BackendConfig +from weboob.tools.value import ValueBackendPassword + +from .browser import WellsFargo + + +__all__ = ['WellsFargoBackend'] + + +class WellsFargoBackend(BaseBackend, CapBank): + NAME = 'wellsfargo' + MAINTAINER = u'Oleg Plakhotniuk' + EMAIL = 'olegus8@gmail.com' + VERSION = '0.j' + LICENSE = 'AGPLv3+' + DESCRIPTION = u'Wells Fargo' + CONFIG = BackendConfig(ValueBackendPassword('login', label='Username', masked=False), + ValueBackendPassword('password', label='Password')) + BROWSER = WellsFargo + + def create_default_browser(self): + return self.create_browser(self.config['login'].get(), + self.config['password'].get()) + + def iter_accounts(self): + with self.browser: + return self.browser.get_accounts() + + def get_account(self, id_): + with self.browser: + return self.browser.get_account(id_) + + def iter_history(self, account): + with self.browser: + return self.browser.iter_history(account) diff --git a/modules/wellsfargo/browser.py b/modules/wellsfargo/browser.py new file mode 100644 index 00000000..2bed2946 --- /dev/null +++ b/modules/wellsfargo/browser.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Oleg Plakhotniuk +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + + +from weboob.capabilities.bank import AccountNotFound +from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword +from .pages import LoginPage, LoginRedirectPage, LoggedInPage, SummaryPage, \ + DynamicPage, DynamicParser +from time import sleep +from mechanize import ItemNotFoundError + + +__all__ = ['WellsFargo'] + + +class WellsFargo(BaseBrowser): + DOMAIN = 'online.wellsfargo.com' + PROTOCOL = 'https' + CERTHASH = ['04ee8bb37799ee3d15174c767bb453f5' + '7b17735fdfafd38cbea0b78979bdacd9'] + ENCODING = 'UTF-8' + PAGES = { + '/$': LoginPage, + '/das/cgi-bin/session.cgi\?screenid=SIGNON$': LoginRedirectPage, + '/das/cgi-bin/session.cgi\?screenid=SIGNON_PORTAL_PAUSE$': + LoggedInPage, + '/das/cgi-bin/session.cgi\?screenid=SIGNON&LOB=CONS$': + LoggedInPage, + '/login\?ERROR_CODE=.*LOB=CONS&$': LoggedInPage, + '/das/channel/accountSummary$': SummaryPage, + '/das/cgi-bin/session.cgi\?sessargs=.+$': + (DynamicPage, DynamicParser()), + '/das/channel/accountActivityDDA\?action=doSetPage&page=.*$': + DynamicPage + } + + def __init__(self, *args, **kwargs): + self._pause = 1 + BaseBrowser.__init__(self, *args, **kwargs) + + def home(self): + self.location('/das/channel/accountSummary') + + def is_logged(self): + try: + return self.page.is_logged() + except AttributeError: + return False + + def login(self): + assert isinstance(self.username, basestring) + assert isinstance(self.password, basestring) + + if not self.is_on_page(LoginPage): + self.location('/', no_login=True) + assert self.is_on_page(LoginPage) + + self.page.login(self.username, self.password) + if not self.is_on_page(LoginRedirectPage): + raise BrowserIncorrectPassword() + + # Sometimes Wells Fargo server returns "Session time out" error + # right after login if we don't make a pause here. + sleep(self._pause) + self._pause = min(30, self._pause*2) + self.page.redirect() + self._pause = 1 + + def get_account(self, id_): + self.to_activity() + if id_ not in self.page.sub_page().accounts_ids(): + raise AccountNotFound() + else: + self.to_activity(id_) + return self.page.sub_page().get_account() + + def get_accounts(self): + self.to_activity() + for id_ in self.page.sub_page().accounts_ids(): + self.to_activity(id_) + yield self.page.sub_page().get_account() + + def to_summary(self): + if not self.is_on_page(SummaryPage): + self.location('/das/channel/accountSummary') + assert self.is_on_page(SummaryPage) + + def is_activity(self): + try: + return self.page.sub_page().is_activity() + except AttributeError: + return False + + def to_activity(self, id_=None): + if not self.is_activity(): + self.to_summary() + self.page.to_activity() + assert self.is_activity() + if id_ and self.page.sub_page().account_id() != id_: + self.page.sub_page().to_account(id_) + assert self.is_activity() + assert self.page.sub_page().account_id() == id_ + + def is_statements(self): + try: + return self.page.sub_page().is_statements() + except AttributeError: + return False + + def to_statements(self, id_=None, year=None): + if not self.is_statements(): + self.to_summary() + self.page.to_statements() + assert self.is_statements() + if id_ and self.page.sub_page().account_id() != id_: + self.page.sub_page().to_account(id_) + assert self.is_statements() + assert self.page.sub_page().account_id() == id_ + if year and self.page.sub_page().year() != year: + self.page.sub_page().to_year(year) + assert self.is_statements() + assert self.page.sub_page().year() == year + + def is_statement(self): + try: + return self.page.sub_page().is_statement() + except AttributeError: + return False + + def to_statement(self, uri): + self.location(uri) + assert self.is_statement() + + def iter_history(self, account): + self.to_activity(account.id) + try: + self.page.sub_page().since_last_statement() + except ItemNotFoundError: + # Skip transactions on web page if we cannot apply + # "since last statement" filter. + # This might be the case, for example, if Wells Fargo + # is processing the current statement: + # "Since your credit card account statement is being processed, + # transactions grouped by statement period will not be available + # for up to seven days." + # (www.wellsfargo.com, 2014-07-20) + pass + else: + assert self.page.sub_page().account_id() == account.id + while True: + for trans in self.page.sub_page().iter_transactions(): + yield trans + if not self.page.sub_page().next_(): + break + + self.to_statements(account.id) + for year in self.page.sub_page().years(): + self.to_statements(account.id, year) + for stmt in self.page.sub_page().statements(): + self.to_statement(stmt) + for trans in self.page.sub_page().iter_transactions(): + yield trans + diff --git a/modules/wellsfargo/pages.py b/modules/wellsfargo/pages.py new file mode 100644 index 00000000..0225c7d4 --- /dev/null +++ b/modules/wellsfargo/pages.py @@ -0,0 +1,426 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Oleg Plakhotniuk +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from weboob.capabilities.bank import Account, Transaction +from weboob.tools.browser import BasePage +from weboob.tools.parsers import get_parser +from weboob.tools.parsers.iparser import IParser +from mechanize import ControlNotFoundError +from urllib import unquote +from StringIO import StringIO +from .parsers import StatementParser, clean_amount, clean_label +import itertools +import re +import datetime + +__all__ = ['LoginPage', 'LoggedInPage', 'SummaryPage'] + + +def form_with_control(control_name): + """ + Form search helper. + Returns whether the form has a control with specified name. + """ + def predicate(form): + try: + form.find_control(name=control_name) + except ControlNotFoundError: + return False + else: + return True + return predicate + + +class LoginPage(BasePage): + def login(self, login, password): + self.browser.select_form(name='Signon') + self.browser['userid'] = login.encode(self.browser.ENCODING) + self.browser['password'] = password.encode(self.browser.ENCODING) + self.browser.submit(nologin=True) + + +class LoginRedirectPage(BasePage): + def is_logged(self): + return True + + def redirect(self): + refresh = self.document.xpath( + '//meta[@http-equiv="Refresh"]/@content')[0] + url = re.match(r'^.*URL=(.*)$', refresh).group(1) + self.browser.location(url) + + +class LoggedInPage(BasePage): + def is_logged(self): + if type(self.document) is str: + return True + else: + return bool(self.document.xpath(u'//a[text()="Sign Off"]')) \ + or bool(self.document.xpath(u'//title[text()="Splash Page"]')) + + +class SummaryPage(LoggedInPage): + def to_activity(self): + href = self.document.xpath(u'//a[text()="Account Activity"]/@href')[0] + self.browser.location(href) + + def to_statements(self): + href = self.document.xpath('//a[text()="Statements & Documents"]' + '/@href')[0] + self.browser.location(href) + + +class DynamicPage(LoggedInPage): + """ + Most of Wells Fargo pages have the same URI pattern. + Some of these pages are HTML, some are PDF. + """ + def sub_page(self): + page = None + if type(self.document) is str: + page = StatementSubPage + elif u'Account Activity' in self._title(): + name = self._account_name() + if u'CHECKING' in name or u'SAVINGS' in name: + page = ActivityCashSubPage + elif u'CARD' in name: + page = ActivityCardSubPage + elif u'Statements & Documents' in self._title(): + page = StatementsSubPage + assert page + return page(self) + + def _title(self): + return self.document.xpath(u'//title/text()')[0] + + def _account_name(self): + return self.document.xpath( + u'//select[@name="selectedAccountUID"]' + u'/option[@selected="selected"]/text()')[0] + + +class SubPage(object): + def __init__(self, page): + self.page = page + + +class AccountSubPage(SubPage): + def account_id(self, name=None): + if name: + return name[-4:] # Last 4 digits of "BLAH XXXXXXX1234" + else: + return self.account_id(self.account_name()) + + +class ActivitySubPage(AccountSubPage): + def __init__(self, *args, **kwargs): + AccountSubPage.__init__(self, *args, **kwargs) + + # As of 2014-07-03, there are few nested "optgroup" nodes on + # the account activity pages, which is a violation of HTML + # standard and cannot be parsed by mechanize's Browser.select_form. + resp = self.page.browser.response() + resp.set_data(re.sub(']*>', '', resp.get_data())) + self.page.browser.set_response(resp) + + def is_activity(self): + return True + + def accounts_names(self): + return self.page.document.xpath( + u'//select[@name="selectedAccountUID"]/option/text()') + + def accounts_ids(self): + return [self.account_id(name) for name in self.accounts_names()] + + def account_uid(self, id_=None): + if id_: + return self.page.document.xpath( + u'//select[@name="selectedAccountUID"]' + u'/option[contains(text(),"%s")]/@value' % id_)[0] + else: + return self.page.document.xpath( + u'//select[@name="selectedAccountUID"]' + u'/option[@selected="selected"]/@value')[0] + + def account_name(self): + return self.page.document.xpath( + u'//select[@name="selectedAccountUID"]' + u'/option[@selected="selected"]/text()')[0] + + def account_type(self, name=None): + raise NotImplementedError() + + def account_balance(self): + raise NotImplementedError() + + def to_account(self, id_): + self.page.browser.select_form(name='AccountActivityForm') + self.page.browser['selectedAccountUID'] = [self.account_uid(id_)] + self.page.browser.submit() + + def get_account(self): + name = self.account_name() + balance = self.account_balance() + currency = Account.get_currency(balance) + id_ = self.account_id() + type_ = self.account_type() + + account = Account() + account.id = id_ + account.label = name + account.currency = currency + account.balance = clean_amount(balance) + account.type = type_ + return account + + def since_last_statement(self): + raise NotImplementedError() + + def iter_transactions(self): + raise NotImplementedError() + + def next_(self): + raise NotImplementedError() + + +class ActivityCashSubPage(ActivitySubPage): + def account_type(self, name=None): + name = name or self.account_name() + if u'CHECKING' in name: + return Account.TYPE_CHECKING + elif u'SAVINGS' in name: + return Account.TYPE_SAVINGS + else: + return Account.TYPE_UNKNOWN + + def account_balance(self): + return self.page.document.xpath( + u'//td[@headers="currentPostedBalance"]/span/text()')[0] + + def since_last_statement(self): + b = self.page.browser + b.select_form(predicate=form_with_control( + 'showTabDDACommand.transactionTypeFilterValue')) + b['showTabDDACommand.transactionTypeFilterValue'] = [ + u'All Transactions'] + b['showTabDDACommand.timeFilterValue'] = ['8'] + b.submit() + + def iter_transactions(self): + for row in self.page.document.xpath('//tr/th[@headers=' + '"postedHeader dateHeader"]/..'): + date = row.xpath('th[@headers="postedHeader ' + 'dateHeader"]/text()')[0] + desc = row.xpath('td[@headers="postedHeader ' + 'descriptionHeader"]/div/text()')[0] + deposit = row.xpath('td[@headers="postedHeader ' + 'depositsConsumerHeader"]/span/text()')[0] + withdraw = row.xpath('td[@headers="postedHeader ' + 'withdrawalsConsumerHeader"]/span/text()')[0] + + date = datetime.datetime.strptime(date, '%m/%d/%y') + + desc = clean_label(desc) + + deposit = deposit.strip() + deposit = clean_amount(deposit or '0') + withdraw = withdraw.strip() + withdraw = clean_amount(withdraw or '0') + + amount = deposit - withdraw + + trans = Transaction(u'') + trans.date = date + trans.rdate = date + trans.type = Transaction.TYPE_UNKNOWN + trans.raw = desc + trans.label = desc + trans.amount = amount + yield trans + + def next_(self): + links = self.page.document.xpath('//a[@title="Go To Next Page"]/@href') + if links: + self.page.browser.location(links[0]) + return True + else: + return False + + +class ActivityCardSubPage(ActivitySubPage): + def account_type(self, name=None): + return Account.TYPE_CARD + + def account_balance(self): + return self.page.document.xpath( + u'//td[@headers="outstandingBalance"]/text()')[0] + + def get_account(self): + account = ActivitySubPage.get_account(self) + + # Credit card is essentially a liability. + # Negative amount means there's a payment due. + account.balance = -account.balance + + return account + + def since_last_statement(self): + b = self.page.browser + b.select_form(predicate=form_with_control( + 'showTabCommand.transactionTypeFilterValue')) + b['showTabCommand.transactionTypeFilterValue'] = ['sincelastStmt'] + b.submit() + + def iter_transactions(self): + for row in self.page.document.xpath('//tr/th[@headers=' + '"postedHeader transactionDateHeader"]/..'): + tdate = row.xpath('th[@headers="postedHeader ' + 'transactionDateHeader"]/text()')[0] + pdate = row.xpath('td[@headers="postedHeader ' + 'postingDateHeader"]/text()')[0] + desc = row.xpath('td[@headers="postedHeader ' + 'descriptionHeader"]/span/text()')[0] + ref = row.xpath('td[@headers="postedHeader ' + 'descriptionHeader"]/text()')[0] + amount = row.xpath('td[@headers="postedHeader ' + 'amountHeader"]/text()')[0] + + tdate = datetime.datetime.strptime(tdate, '%m/%d/%y') + pdate = datetime.datetime.strptime(pdate, '%m/%d/%y') + + desc = clean_label(desc) + + ref = re.match('.*]+)>.*', ref).group(1) + + if amount.startswith('+'): + amount = clean_amount(amount[1:]) + else: + amount = -clean_amount(amount) + + trans = Transaction(ref) + trans.date = tdate + trans.rdate = pdate + trans.type = Transaction.TYPE_UNKNOWN + trans.raw = desc + trans.label = desc + trans.amount = amount + yield trans + + def next_(self): + # As of 2014-07-05, there's only one page for cards history. + return False + + +class StatementsSubPage(AccountSubPage): + def __init__(self, *args, **kwargs): + AccountSubPage.__init__(self, *args, **kwargs) + + # As of 2014-07-06, there are few "
" nodes on + # the account statements pages, which is a violation of HTML + # standard and cannot be parsed by mechanize's Browser.select_form. + resp = self.page.browser.response() + resp.set_data(re.sub('
', '', resp.get_data())) + self.page.browser.set_response(resp) + + def is_statements(self): + return True + + def account_name(self): + return self.page.document.xpath( + u'//select[@name="selectedAccountKey"]' + u'/option[@selected="selected"]/text()')[0] + + def account_uid(self, id_): + return self.page.document.xpath( + u'//select[@name="selectedAccountKey"]' + u'/option[contains(text(),"%s")]/@value' % id_)[0] + + def to_account(self, id_): + self.page.browser.select_form(predicate=form_with_control( + 'selectedAccountKey')) + self.page.browser['selectedAccountKey'] = [self.account_uid(id_)] + self.page.browser.submit() + + def year(self): + for text in self.page.document.xpath('//h2/strong/text()'): + try: + return int(text) + except ValueError: + pass + + def years(self): + for text in self.page.document.xpath('//h2//strong/text()'): + try: + yield int(text) + except ValueError: + pass + + def to_year(self, year): + href = self.page.document.xpath('//h2/a/strong[text()="%s"]' + '/../@href' % year)[0] + self.page.browser.location(href) + + def statements(self): + for outer_uri in self.page.document.xpath( + '//table[@id="listOfStatements"]' + '//a[contains(text(), "Statement")]/@href'): + inner_uri = re.match('.*destinationClickUrl=([^&]+)&.*', + outer_uri).group(1) + yield unquote(inner_uri) + + +class StatementSubPage(SubPage): + + def __init__(self, *args, **kwArgs): + SubPage.__init__(self, *args, **kwArgs) + self._parser = StatementParser(self.page.document) + + def is_statement(self): + return True + + def iter_transactions(self): + # Maintain a nice consistent newer-to-older order of transactions. + return sorted( + itertools.chain( + self._parser.read_cash_transactions(), + self._parser.read_card_transactions()), + cmp=lambda t1, t2: cmp(t2.date, t1.date) or + cmp(t1.label, t2.label) or + cmp(t1.amount, t2.amount)) + + +class DynamicParser(IParser): + def __init__(self): + self._html = get_parser()() + self._raw = get_parser('raw')() + self._parser = None + + def parse(self, data, encoding=None): + # Ugly hack to figure out the document type + s = data.read() + if s[:4] == '%PDF': + self._parser = self._raw + else: + self._parser = self._html + return self._parser.parse(StringIO(s), encoding) + + def __getattr__(self, name): + assert self._parser + return getattr(self._parser, name) diff --git a/modules/wellsfargo/parsers.py b/modules/wellsfargo/parsers.py new file mode 100644 index 00000000..c3f895b6 --- /dev/null +++ b/modules/wellsfargo/parsers.py @@ -0,0 +1,401 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Oleg Plakhotniuk +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from weboob.capabilities.bank import Transaction +from weboob.tools.capabilities.bank.transactions import AmericanTransaction +from decimal import Decimal +from tempfile import mkstemp +import subprocess +import os +import re +import datetime + +__all__ = ['StatementParser', 'clean_amount', 'clean_label'] + + +def clean_amount(text): + return Decimal(AmericanTransaction.clean_amount(text)) + + +def clean_label(text): + """ + Web view and statements use different label formatting. + User shouldn't be able to see the difference, so we + need to make labels from both sources look the same. + """ + return re.sub(u' +', u' ', text.strip().upper(), re.UNICODE) + + +def full_date(date, date_from, date_to): + """ + Makes sure that date is close to the given range. + Transactions dates in a statement contain only day and month. + Statement dates range have a year though. + Merge them all together to get a full transaction date. + """ + dates = [datetime.datetime(d.year, date.month, date.day) + for d in (date_from, date_to)] + + # Ideally, pick the date within given range. + for d in dates: + if date_from <= d <= date_to: + return d + + # Otherwise, return the most recent date in the past + return min(dates, key=lambda d: abs(d-date_from)) + + +def decompress_pdf(inpdf): + inh, inname = mkstemp(suffix='.pdf') + outh, outname = mkstemp(suffix='.pdf') + os.write(inh, inpdf) + os.close(inh) + os.close(outh) + + # mutool is a part of MuPDF (http://www.mupdf.com). + subprocess.call(['mutool', 'clean', '-d', inname, outname]) + + with open(outname) as f: + outpdf = f.read() + os.remove(inname) + os.remove(outname) + return outpdf + + +class StatementParser(object): + """ + Each "read_*" method which takes position as its argument, + returns next token position if read was successful, + and the same position if it was not. + """ + def __init__(self, pdf): + self._pdf = decompress_pdf(pdf) + self._tok = StatementTokenizer(self._pdf) + + def read_card_transactions(self): + # Early check if this is a card account statement at all. + if '[(Transactions)] TJ' not in self._pdf: + return + + # Read statement dates range. + date_from, date_to = self.read_first_date_range() + + # Read transactions. + pos = 0 + while not self._tok.tok(pos).is_eof(): + pos, trans = self.read_card_transaction(pos, date_from, date_to) + if trans: + yield trans + else: + pos += 1 + + def read_cash_transactions(self): + # Early check if this is a cash account statement at all. + if '[(Transaction history)] TJ' not in self._pdf: + return + + # Read statement dates range. + date_from, date_to = self.read_first_date_range() + + # Read transactions. + pos = 0 + while not self._tok.tok(pos).is_eof(): + pos, trans = self.read_cash_transaction(pos, date_from, date_to) + if trans: + yield trans + else: + pos += 1 + + def read_first_date_range(self): + pos = 0 + while not self._tok.tok(pos).is_eof(): + pos, date_range = self.read_date_range(pos) + if date_range is not None: + return date_range + else: + pos += 1 + + def read_card_transaction(self, pos, date_from, date_to): + INDENT_CHARGES = 520 + + startPos = pos + + pos, tdate = self.read_date(pos) + pos, pdate_layout = self.read_layout_tm(pos) + pos, pdate = self.read_date(pos) + pos, ref_layout = self.read_layout_tm(pos) + pos, ref = self.read_ref(pos) + pos, desc = self.read_multiline_desc(pos) + pos, amount = self.read_indent_amount( + pos, + range_minus = (INDENT_CHARGES, 9999), + range_plus = (0, INDENT_CHARGES)) + + if tdate is None or pdate_layout is None or pdate is None \ + or ref_layout is None or ref is None or desc is None or amount is None: + return startPos, None + else: + tdate = full_date(tdate, date_from, date_to) + pdate = full_date(pdate, date_from, date_to) + + trans = Transaction(ref) + trans.date = tdate + trans.rdate = pdate + trans.type = Transaction.TYPE_UNKNOWN + trans.raw = desc + trans.label = desc + trans.amount = amount + return pos, trans + + def read_cash_transaction(self, pos, date_from, date_to): + INDENT_BALANCE = 520 + INDENT_WITHDRAWAL = 470 + + startPos = pos + + pos, date = self.read_date(pos) + pos, _ = self.read_star(pos) + pos, desc = self.read_multiline_desc(pos) + pos, amount = self.read_indent_amount( + pos, + range_plus = (0, INDENT_WITHDRAWAL), + range_minus = (INDENT_WITHDRAWAL, INDENT_BALANCE), + range_skip = (INDENT_BALANCE, 9999)) + + if desc is None or date is None or amount is None: + return startPos, None + else: + date = full_date(date, date_from, date_to) + + trans = Transaction(u'') + trans.date = date + trans.rdate = date + trans.type = Transaction.TYPE_UNKNOWN + trans.raw = desc + trans.label = desc + trans.amount = amount + return pos, trans + + def read_multiline_desc(self, pos): + startPos = pos + + descs = [] + while True: + prevPos = pos + pos, layout = self.read_layout_tm(pos) + pos, desc = self.read_text(pos) + if layout is None or desc is None: + pos = prevPos + break + else: + descs.append(desc) + + if descs: + return pos, clean_label(' '.join(descs)) + else: + return startPos, None + + def read_indent_amount(self, pos, range_skip=(0,0), range_plus=(0,0), + range_minus=(0,0)): + startPos = pos + + # Read layout-amount pairs. + amounts = [] + while True: + prevPos = pos + pos, layout = self.read_layout_tm(pos) + pos, amount = self.read_amount(pos) + if layout is None or amount is None: + pos = prevPos + break + else: + amounts.append((layout, amount)) + + if not amounts: + return startPos, None + else: + # Infer amount type by its indentation in the layout. + amount_total = clean_amount('0') + for (_, _, _, _, indent, _), amount in amounts: + within = lambda (xmin, xmax): xmin <= indent <= xmax + if within(range_skip): + continue + elif within(range_plus): + amount_total += amount + elif within(range_minus): + amount_total -= amount + return pos, amount_total + + def read_star(self, pos): + pos1, star1 = self.read_star_1(pos) + pos2, star2 = self.read_star_2(pos) + if star1 is not None: + return pos1, star1 + else: + return pos2, star2 + + def read_star_1(self, pos): + startPos = pos + + vals = list() + pos, v = self.read_layout_tz(pos); vals.append(v) + pos, v = self.read_layout_tc(pos); vals.append(v) + pos, v = self.read_layout_tw(pos); vals.append(v) + pos, v = self.read_layout_tf(pos); vals.append(v) + pos, v = self.read_layout_tm(pos); vals.append(v) + pos, star = self.read_text(pos) + pos, v = self.read_layout_tz(pos); vals.append(v) + pos, v = self.read_layout_tc(pos); vals.append(v) + pos, v = self.read_layout_tw(pos); vals.append(v) + pos, v = self.read_layout_tf(pos); vals.append(v) + + if star == 'S' and None not in vals: + return pos, star + else: + return startPos, None + + def read_star_2(self, pos): + startPos = pos + + vals = list() + pos, v = self.read_layout_tf(pos); vals.append(v) + pos, v = self.read_layout_tm(pos); vals.append(v) + pos, star = self.read_text(pos) + pos, v = self.read_layout_tf(pos); vals.append(v) + + if star == 'S' and None not in vals: + return pos, star + else: + return startPos, None + + def read_date(self, pos): + t = self._tok.tok(pos) + return (pos+1, datetime.datetime.strptime(t.value(), '%m/%d')) \ + if t.is_date() else (pos, None) + + def read_text(self, pos): + t = self._tok.tok(pos) + #TODO: handle PDF encodings properly. + return (pos+1, unicode(t.value(), errors='ignore')) \ + if t.is_text() else (pos, None) + + def read_amount(self, pos): + t = self._tok.tok(pos) + return (pos+1, clean_amount(t.value())) \ + if t.is_amount() else (pos, None) + + def read_date_range(self, pos): + t = self._tok.tok(pos) + if t.is_date_range_1(): + return (pos+1, [datetime.datetime.strptime(v, '%B %d, %Y') + for v in t.value()]) + elif t.is_date_range_2(): + return (pos+1, [datetime.datetime.strptime(v, '%m/%d/%Y') + for v in t.value()]) + else: + return (pos, None) + + def read_ref(self, pos): + t = self._tok.tok(pos) + return (pos+1, t.value()) if t.is_ref() else (pos, None) + + def read_layout_tz(self, pos): + t = self._tok.tok(pos) + return (pos+1, t.value()) if t.is_layout_tz() else (pos, None) + + def read_layout_tc(self, pos): + t = self._tok.tok(pos) + return (pos+1, t.value()) if t.is_layout_tc() else (pos, None) + + def read_layout_tw(self, pos): + t = self._tok.tok(pos) + return (pos+1, t.value()) if t.is_layout_tw() else (pos, None) + + def read_layout_tf(self, pos): + t = self._tok.tok(pos) + return (pos+1, t.value()) if t.is_layout_tf() else (pos, None) + + def read_layout_tm(self, pos): + t = self._tok.tok(pos) + return (pos+1, [float(v) for v in t.value()]) \ + if t.is_layout_tm() else (pos, None) + + +class StatementTokenizer(object): + def __init__(self, pdf): + self._tok = [StatementToken(line) for line in pdf.split('\n')] + + def tok(self, index): + if 0 <= index < len(self._tok): + return self._tok[index] + else: + return StatementToken(eof=True) + + +class StatementToken(object): + """ + Simple regex-based lexer. + There's a lexing table consisting of type-regex tuples. + Text line is sequentially matched against regexes and first + successful match defines the type of the token. + """ + LEX = [ + ('amount', r'^\[\(([0-9,]+\.\d+)\)\] TJ$'), + ('date', r'^\[\((\d+/\d+)\)\] TJ$'), + ('date_range_1', r'^\[\(([A-z]+ \d+, \d{4})' + r' - ([A-z]+ \d+, \d{4})\)\] TJ$'), + ('date_range_2', r'^\[\((\d{2}/\d{2}/\d{4})' + r' to (\d{2}/\d{2}/\d{4})\)\] TJ$'), + ('layout_tz', r'^(\d+\.\d{2}) Tz$'), + ('layout_tc', r'^(\d+\.\d{2}) Tc$'), + ('layout_tw', r'^(\d+\.\d{2}) Tw$'), + ('layout_tf', r'^/F(\d) (\d+\.\d{2}) Tf$'), + ('layout_tm', r'^' + (r'(\d+\.\d+ )'*6) + r'Tm$'), + ('ref', r'^\[\(([0-9A-Z]{17})\)\] TJ$'), + + ('text', r'^\[\(([^\)]+)\)\] TJ$') + ] + + def __init__(self, line=None, eof=False): + self._eof = eof + self._value = None + self._type = None + if line is not None: + for type_, regex in self.LEX: + m = re.match(regex, line, flags=re.UNICODE) + if m: + self._type = type_ + if len(m.groups()) == 1: + self._value = m.groups()[0] + elif m.groups(): + self._value = m.groups() + else: + self._value = m.group(0) + break + + def is_eof(self): + return self._eof + + def value(self): + return self._value + +for type_, _ in StatementToken.LEX: + setattr(StatementToken, 'is_%s' % type_, + eval('lambda self: self._type == "%s"' % type_)) + diff --git a/modules/wellsfargo/test.py b/modules/wellsfargo/test.py new file mode 100644 index 00000000..0c86f284 --- /dev/null +++ b/modules/wellsfargo/test.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2014 Oleg Plakhotniuk +# +# This file is part of weboob. +# +# weboob is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# weboob is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with weboob. If not, see . + +from weboob.tools.test import BackendTest +from itertools import chain + + +class WellsFargoTest(BackendTest): + BACKEND = 'wellsfargo' + + def test_history(self): + """ + Test that there's at least one transaction in the whole history. + """ + b = self.backend + ts = chain(*[b.iter_history(a) for a in b.iter_accounts()]) + t = next(ts, None) + self.assertNotEqual(t, None)