Citibank module (https://online.citibank.com). Implements #1642

This commit is contained in:
Oleg Plakhotniuk 2014-10-13 04:41:44 -05:00
commit 5997448169
6 changed files with 586 additions and 0 deletions

View file

@ -0,0 +1,23 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .module import CitibankModule
__all__ = ['CitibankModule']

261
modules/citibank/browser.py Normal file
View file

@ -0,0 +1,261 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import Account, AccountNotFound, Transaction
from weboob.exceptions import BrowserIncorrectPassword, BrowserUnavailable
from weboob.tools.capabilities.bank.transactions import \
AmericanTransaction as AmTr
from .parser import StatementParser, clean_label
from time import sleep
from tempfile import mkdtemp
from shutil import rmtree
from itertools import chain
import datetime
import re
import os
import subprocess
__all__ = ['Citibank']
def retrying(func):
def inner(*args, **kwargs):
MAX_RETRIES = 10
MAX_DELAY = 10
for i in xrange(MAX_RETRIES):
try:
return func(*args, **kwargs)
except OnceAgain:
sleep(min(1 << i, MAX_DELAY))
raise BrowserUnavailable('Unexpected site behavior. '
'Perhaps this module needs some fixing...')
return inner
class OnceAgain(Exception):
pass
class Citibank(object):
"""
Citibank website uses lots of Javascript, some of which seems to be
dynamically generated and intentionally obfuscated.
Our answer to THAT is... heavy artillery firing Selenium rounds!
External dependencies:
Firefox (https://www.mozilla.org/firefox).
MuPDF (http://www.mupdf.com).
Python bindings for Selenium (https://pypi.python.org/pypi/selenium).
Xvfb (http://www.x.org/releases/X11R7.6/doc/man/man1/Xvfb.1.xhtml).
Tested on Arch Linux snapshot of 2014-08-25 (official and user packages).
Don't forget to do "export DISPLAY=:0".
Only a single credit card account is currently supported.
Contributions are welcome!
"""
def __init__(self, username, password, logger, **kwargs):
self._logger = logger
self._username = username
self._password = password
def get_account(self, id_):
account = next(self.iter_accounts())
if account.id != id_:
raise AccountNotFound()
return account
def iter_accounts(self):
self.start()
bal = self.wait('div.cT-valueItem span.cT-balanceIndicator1')[0].text
account = Account()
account.id = self._account_id()
account.label = self._account_link().text
account.currency = Account.get_currency(bal)
account.balance = -AmTr.decimal_amount(bal)
account.type = Account.TYPE_CARD
self.finish()
yield account
def iter_history(self, account):
for trans in chain(self.iter_history_recent(account),
self.iter_history_statements(account)):
yield trans
def start(self):
# To avoid ImportError during e.g. building modules list.
from selenium import webdriver
HOME_URL = 'https://online.citibank.com/US/JPS/portal/Home.do'
WIDTH = 1920
HEIGHT = 10000 # So that everything fits...
self._downloads = mkdtemp()
self._logger.debug('Saving downloaded files to %s' % self._downloads)
prof = webdriver.FirefoxProfile()
prof.set_preference('browser.download.folderList', 2)
prof.set_preference('browser.download.dir', self._downloads)
prof.set_preference('browser.helperApps.neverAsk.saveToDisk',
'application/pdf')
prof.set_preference('pdfjs.disabled', True)
self._browser = webdriver.Firefox(prof)
self._browser.set_window_size(WIDTH, HEIGHT)
self._browser.get('https://online.citibank.com')
self.wait('input[name="usernameMasked"]')[0].send_keys(self._username)
self.wait('input[name="password"]')[0].send_keys(self._password)
self.wait('form[name="SignonForm"]')[0].submit()
self._browser.get(HOME_URL)
if self._browser.current_url != HOME_URL:
raise BrowserIncorrectPassword()
def finish(self):
prof_dir = self._browser.firefox_profile.profile_dir
self._browser.close()
del self._browser
rmtree(self._downloads)
del self._downloads
rmtree(prof_dir)
def iter_history_recent(self, account):
self.start()
if account.id != self._account_id():
raise AccountNotFound()
self._account_link().click()
self.wait_ajax()
for span in self.wait('span.cM-maximizeButton'):
span.click()
for tr in self.wait('tr.payments,tr.purchase'):
trdata = lambda n: tr.find_element_by_css_selector(
'td.cT-bodyTableColumn%i span.cT-line1' % n).text
treid = tr.get_attribute('id').replace('rowID', 'rowIDExt')
tredata = {}
for tre in self.wait('tr#%s' % treid):
labels = [x.text for x in tre.find_elements_by_css_selector(
'div.cT-labelItem')]
values = [x.text for x in tre.find_elements_by_css_selector(
'div.cT-valueItem')]
tredata = dict(zip(labels, values))
ref = tredata.get(u'Reference Number:', u'')
tdate = trdata(1)
pdate = tredata.get(u'Posted Date :', tdate)
desc = clean_label(trdata(2))
amount = trdata(4)
tdate = datetime.datetime.strptime(tdate, '%m-%d-%Y')
pdate = datetime.datetime.strptime(pdate, '%m-%d-%Y')
if amount.startswith(u'(') and amount.endswith(u')'):
amount = AmTr.decimal_amount(amount[1:-1])
else:
amount = -AmTr.decimal_amount(amount)
trans = Transaction(ref)
trans.date = tdate
trans.rdate = pdate
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = amount
yield trans
self.finish()
def iter_history_statements(self, account):
# To avoid ImportError during e.g. building modules list.
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import MoveTargetOutOfBoundsException,\
ElementNotVisibleException
self.start()
if account.id != self._account_id():
raise AccountNotFound()
self.wait('a#cmlink_ViewPastStmtLink')[0].click()
opts = self.wait('option#currentStatementDateOptions')
for i, opt in enumerate(opts):
# We're interested only in finalized statements.
if u'Unbilled' in opt.get_attribute('value'):
continue
self.wait('div#currentStatementsDate-button')[0].click()
ul = self.wait('ul#currentStatementsDate-menu')[0]
while True:
try:
self.wait('li#currentStatementDateOptions span')[i].click()
break
except (MoveTargetOutOfBoundsException,
ElementNotVisibleException):
ul.send_keys(Keys.ARROW_DOWN)
self.wait('a#downloadCurrentStatements')[0].click()
pdfname = self.wait_file('.pdf')
pdfpath = os.path.join(self._downloads, pdfname)
with open(pdfpath, 'rb') as f:
parser = StatementParser(f.read())
os.remove(pdfpath)
# Transactions in a statement can go in different order.
ts = sorted(parser.read_transactions(),
cmp=lambda t1, t2: cmp(t2.date, t1.date))
for t in ts:
yield t
self.finish()
@retrying
def wait(self, selector):
self._logger.debug('Waiting for selector """%s""" on page %s' % (
selector, self._browser.current_url))
els = self._browser.find_elements_by_css_selector(selector)
if not els:
raise OnceAgain()
return els
@retrying
def wait_ajax(self):
self._logger.debug('Waiting for async requests to finish on page %s'
% self._browser.current_url)
els = self._browser.find_elements_by_xpath(
u'//*[contains(text(),"Please wait")]')
if not els or any(x.is_displayed() for x in els):
raise OnceAgain()
@retrying
def wait_file(self, suffix):
self._logger.debug('Waiting for file "*%s" to finish downloading.' % \
suffix)
for name in os.listdir(self._downloads):
if not name.endswith(suffix):
continue
path = os.path.join(self._downloads, name)
# Wait until file is not empty.
if not os.stat(path).st_size:
continue
# Wait until no processes are accessing the file.
if subprocess.call(['fuser', '-s', path]) == 0:
continue
return name
raise OnceAgain()
def _account_link(self):
return self.wait('a#cmlink_AccountNameLink')[0]
def _account_id(self):
return re.match('.*-([0-9]+)$', self._account_link().text).group(1)

Binary file not shown.

After

Width:  |  Height:  |  Size: 4 KiB

View file

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import CapBank
from weboob.tools.backend import Module, BackendConfig
from weboob.tools.value import ValueBackendPassword
from .browser import Citibank
__all__ = ['CitibankModule']
class CitibankModule(Module, CapBank):
NAME = 'citibank'
MAINTAINER = u'Oleg Plakhotniuk'
EMAIL = 'olegus8@gmail.com'
VERSION = '1.0'
LICENSE = 'AGPLv3+'
DESCRIPTION = u'Citibank'
CONFIG = BackendConfig(
ValueBackendPassword('username', label='Username', masked=False),
ValueBackendPassword('password', label='Password'))
BROWSER = Citibank
def create_default_browser(self):
return self.create_browser(self.config['username'].get(),
self.config['password'].get())
def iter_accounts(self):
return self.browser.iter_accounts()
def get_account(self, id_):
return self.browser.get_account(id_)
def iter_history(self, account):
return self.browser.iter_history(account)

214
modules/citibank/parser.py Normal file
View file

@ -0,0 +1,214 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import Transaction
from weboob.tools.capabilities.bank.transactions import \
AmericanTransaction as AmTr
from weboob.tools.date import closest_date
from weboob.tools.pdf import decompress_pdf
from weboob.tools.tokenizer import ReTokenizer
import datetime
import re
def clean_label(text):
"""
Web view and statements use different label formatting.
User shouldn't be able to see the difference, so we
need to make labels from both sources look the same.
"""
for pattern in [r' \d+\.\d+ +POUND STERLING',
u'Subject to Foreign Fee',
u'Description']:
text = re.sub(pattern, u'', text, re.UNICODE)
return re.sub(r' +', u' ', text.strip().upper(), re.UNICODE)
def formatted(read_func):
"""
Reads boilerplate PDF formatting around the data of interest.
"""
def wrapped(self, pos):
startPos = pos
pos, ws = self.read_whitespace(pos)
pos, bt = self.read_layout_bt(pos)
pos, tf = self.read_layout_tf(pos)
pos, tm = self.read_layout_tm(pos)
pos, data = read_func(self, pos)
pos, et = self.read_layout_et(pos)
if ws is None or bt is None or tf is None \
or tm is None or data is None or et is None:
return startPos, None
else:
return pos, data
return wrapped
class StatementParser(object):
"""
Each "read_*" method takes position as its argument,
and returns next token position if read was successful,
or the same position if it was not.
"""
LEX = [
('date_range', r'^\((\d{2}/\d{2}/\d{2})-(\d{2}/\d{2}/\d{2})\) Tj$'),
('amount', r'^\((-?\$\d+\.\d{2})\) Tj$'),
('date', r'^\((\d{2}/\d{2})\) Tj$'),
('text', r'^\((.*)\) Tj$'),
('layout_tf', r'^.* Tf$'),
('layout_tm', r'^' + (6*r'([^ ]+) ') + r'Tm$'),
('layout_bt', r'^BT$'),
('layout_et', r'^ET$'),
('whitespace', r'^$')
]
def __init__(self, pdf):
self._pdf = decompress_pdf(pdf)
self._tok = ReTokenizer(self._pdf, '\n', self.LEX)
def read_transactions(self):
# Read statement dates range.
date_from, date_to = self.read_first_date_range()
# Read transactions.
pos = 0
while not self._tok.tok(pos).is_eof():
pos, trans = self.read_transaction(pos, date_from, date_to)
if trans:
yield trans
else:
pos += 1
def read_first_date_range(self):
pos = 0
while not self._tok.tok(pos).is_eof():
pos, date_range = self.read_date_range(pos)
if date_range is not None:
return date_range
else:
pos += 1
def read_date_range(self, pos):
t = self._tok.tok(pos)
if t.is_date_range():
return (pos+1, [datetime.datetime.strptime(v, '%m/%d/%y')
for v in t.value()])
else:
return (pos, None)
def read_transaction(self, pos, date_from, date_to):
startPos = pos
pos, tdate = self.read_date(pos)
pos, pdate = self.read_date(pos)
# Early check to call read_multiline_desc() only when needed.
if tdate is None:
return startPos, None
pos, desc = self.read_multiline_desc(pos)
pos, amount = self.read_amount(pos)
if desc is None or amount is None:
return startPos, None
else:
# Sometimes one date is missing.
pdate = pdate or tdate
tdate = closest_date(tdate, date_from, date_to)
pdate = closest_date(pdate, date_from, date_to)
trans = Transaction()
trans.date = tdate
trans.rdate = pdate
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = -amount
return pos, trans
def read_multiline_desc(self, pos):
"""
Read transaction description which can span over multiple lines.
Amount must always follow the multiline description.
But multiline description might be split by page break.
After reading first line of the description, we skip everything
which is not an amount and which has different horizontal offset
than the first read line.
"""
startPos = pos
descs = []
xofs = None
while not self._tok.tok(pos).is_eof():
pos, desc_tm = self.read_text(pos)
if desc_tm is None:
if not descs:
break
prev_pos = pos
pos, amount = self.read_amount(pos)
if amount is not None:
pos = prev_pos
break
pos += 1
else:
desc, tm = desc_tm
if xofs is None:
_, _, _, _, xofs, _ = tm
_, _, _, _, xofs_new, _ = tm
if xofs == xofs_new:
descs.append(desc)
else:
pos += 1
if descs:
return pos, clean_label(' '.join(descs))
else:
return startPos, None
def __getattr__(self, name):
if name.startswith('read_'):
return lambda pos: self._tok.simple_read(name[5:], pos)
raise AttributeError()
@formatted
def read_date(self, pos):
return self._tok.simple_read('date', pos,
lambda v: datetime.datetime.strptime(v, '%m/%d'))
@formatted
def read_amount(self, pos):
return self._tok.simple_read('amount', pos, AmTr.decimal_amount)
def read_text(self, pos):
startPos = pos
pos, ws = self.read_whitespace(pos)
pos, bt = self.read_layout_bt(pos)
pos, tf = self.read_layout_tf(pos)
pos, tm = self.read_layout_tm(pos)
pos, text = self._tok.simple_read('text', pos,
lambda v: unicode(v, errors='ignore'))
pos, et = self.read_layout_et(pos)
if ws is None or bt is None or tf is None \
or tm is None or text is None or et is None:
return startPos, None
else:
return pos, (text, tm)

34
modules/citibank/test.py Normal file
View file

@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.test import BackendTest
from itertools import chain
class CitibankTest(BackendTest):
MODULE = 'citibank'
def test_history(self):
"""
Test that there's at least one transaction in the whole history.
"""
b = self.backend
ts = chain(*[b.iter_history(a) for a in b.iter_accounts()])
t = next(ts, None)
self.assertNotEqual(t, None)