[citibank] Use Selenium only to log in. Closes #1740

This commit is contained in:
Oleg Plakhotniuk 2015-02-26 03:52:47 -06:00 committed by Romain Bignon
commit 3aa13dcef1

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2014 Oleg Plakhotniuk
# Copyright(C) 2014, 2015 Oleg Plakhotniuk
#
# This file is part of weboob.
#
@ -18,6 +18,8 @@
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.browser import LoginBrowser, URL, need_login
from weboob.browser.pages import HTMLPage, JsonPage, RawPage
from weboob.capabilities.bank import Account, AccountNotFound, Transaction
from weboob.exceptions import BrowserIncorrectPassword, BrowserUnavailable
from weboob.tools.capabilities.bank.transactions import \
@ -25,42 +27,109 @@ from weboob.tools.capabilities.bank.transactions import \
from .parser import StatementParser, clean_label
from time import sleep
from tempfile import mkdtemp
from shutil import rmtree
from itertools import chain
import datetime
import gc
import re
import os
import subprocess
from datetime import datetime
from time import sleep
from urllib import unquote
from shutil import rmtree
__all__ = ['Citibank']
def retrying(func):
def inner(*args, **kwargs):
MAX_RETRIES = 10
MAX_DELAY = 10
for i in xrange(MAX_RETRIES):
try:
return func(*args, **kwargs)
except OnceAgain:
sleep(min(1 << i, MAX_DELAY))
raise BrowserUnavailable('Unexpected site behavior. '
'Perhaps this module needs some fixing...')
return inner
class SomePage(HTMLPage):
@property
def logged(self):
return bool(self.doc.xpath(u'//a[text()="Sign Off"]'))
class OnceAgain(Exception):
pass
class AccountsPage(JsonPage):
logged = True
def inner_ids_dict(self):
return dict((prod['parsedAccountName'][-4:], prod['accountInstanceId'])
for cat in self.doc['accountsSummaryViewObj']['categoryList']
for prod in cat['products'] if cat['categoryType'] == 'CRD')
class Citibank(object):
class AccDetailsPage(JsonPage):
logged = True
def account(self):
detact = self.doc['accountDetailsAndActivity']
details = detact['accountDetails']
account = Account()
account.type = Account.TYPE_CARD
account.label = re.sub(r'<[^>]+>', '', detact['accountName'])
account.id = account.label[-4:]
account._innerId = details['instanceID']
for bal in details['accountBalances']:
label, value = bal['label'], (bal['value'] or ['0'])[0]
if label == u'Current Balance:':
account.currency = Account.get_currency(value)
account.balance = -AmTr.decimal_amount(value)
elif label == u'Total Revolving Credit Line:':
account.cardlimit = AmTr.decimal_amount(value)
elif label.startswith(u'Minimum Payment Due'):
d = re.match(r'.*(..-..-....):$', label).group(1)
account.paydate = datetime.strptime(d, '%m-%d-%Y')
account.paymin = AmTr.decimal_amount(value)
return account
def transactions(self):
return sorted(self.unsorted_trans(),
lambda a, b: cmp(a.date, b.date), reverse=True)
def unsorted_trans(self):
for jnl in self.doc['accountDetailsAndActivity']['accountActivity'] \
['postedTransactionJournals']:
tdate = jnl['columns'][0]['activityColumn'][0]
label = jnl['columns'][1]['activityColumn'][0]
amount = jnl['columns'][3]['activityColumn'][0]
xdescs = dict((x['label'], x['value'][0])
for x in jnl['extendedDescriptions'])
pdate = xdescs[u'Posted Date :']
ref = xdescs.get(u'Reference Number:') or u''
if amount.startswith(u'(') and amount.endswith(u')'):
amount = AmTr.decimal_amount(amount[1:-1])
else:
amount = -AmTr.decimal_amount(amount)
label = clean_label(label)
trans = Transaction(ref)
trans.date = datetime.strptime(tdate, '%m-%d-%Y')
trans.rdate = datetime.strptime(pdate, '%m-%d-%Y')
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = label
trans.label = label
trans.amount = amount
yield trans
class StatementsPage(SomePage):
def dates(self):
return self.doc.xpath(
'//select[@id="currentStatementsDate"]/option/text()')[1:]
class StatementPage(RawPage):
logged = True
def __init__(self, *args, **kwArgs):
RawPage.__init__(self, *args, **kwArgs)
self._parser = StatementParser(self.doc)
def transactions(self):
return sorted(self._parser.read_transactions(),
cmp=lambda t1, t2: cmp(t2.date, t1.date))
class Citibank(LoginBrowser):
"""
Citibank website uses lots of Javascript, some of which seems to be
dynamically generated and intentionally obfuscated.
Our answer to THAT is... heavy artillery firing Selenium rounds!
Citibank website uses some kind of Javascript magic during login
negotiation, hence a real browser is being used to log in.
External dependencies:
Firefox (https://www.mozilla.org/firefox).
@ -68,218 +137,97 @@ class Citibank(object):
Python bindings for Selenium (https://pypi.python.org/pypi/selenium).
Xvfb (http://www.x.org/releases/X11R7.6/doc/man/man1/Xvfb.1.xhtml).
Tested on Arch Linux snapshot of 2014-08-25 (official and user packages).
Don't forget to do "export DISPLAY=:0".
Tested on Arch Linux snapshot of 2014-11-11 (official and user packages).
Only a single credit card account is currently supported.
Contributions are welcome!
"""
def __init__(self, username, password, logger, **kwargs):
self._logger = logger
self._username = username
self._password = password
MAX_RETRIES = 10
MAX_DELAY = 10
BASEURL = 'https://online.citibank.com'
home = URL(r'/US/JPS/portal/Home.do', SomePage)
accounts = URL(r'/US/REST/accountsPanel'
r'/getCustomerAccounts.jws\?ttc=(?P<ttc>.*)$',
AccountsPage)
accdetails = URL(r'/US/REST/accountDetailsActivity'
r'/getAccountDetailsActivity.jws\?accountID=(?P<accountID>.*)$',
AccDetailsPage)
statements = URL(r'/US/NCSC/doccenter/flow.action\?TTC=1079&'
'accountID=(?P<accountID>.*)$', StatementsPage)
statement = URL(r'/US/REST/doccenterresource/downloadStatementsPdf.jws\?'
r'selectedIndex=0&date=(?P<date>....-..-..)&'
r'downloadFormat=pdf', StatementPage)
unknown = URL('/.*$', SomePage)
def get_account(self, id_):
account = next(self.iter_accounts())
if account.id != id_:
raise AccountNotFound()
return account
innerId = self.to_accounts().inner_ids_dict().get(id_)
if innerId:
return self.to_account(innerId).account()
raise AccountNotFound()
def iter_accounts(self):
self.start()
self._account_link().click()
self.wait_ajax()
label = self.wait('label#accountSingle .cS-accountMenuAccount')[0].text
bal = self._browser.find_elements_by_xpath(u'//span[contains(text(),'
'"Current Balance:")]/../../div[@class="cT-valueItem"]')[0].text
cardlimit = self._browser.find_elements_by_xpath(
u'//span[contains(text(),"Total Revolving Credit Line:")]'
'/../../div[@class="cT-valueItem"]')[0].text
paydue=self._browser.find_elements_by_xpath(u'//span[contains(text(),'
'"Minimum Payment Due") and @role="gridcell"]')[0].text
paydate = re.match(u'Minimum Payment Due.On (..-..-....):', paydue,
re.DOTALL + re.UNICODE).group(1)
paymin = self._browser.find_elements_by_xpath(
u'//span[contains(text(),"Minimum Payment Due")]'
'/../../div[contains(@class,"cT-valueItem")]')[0].text
account = Account()
account.id = label[-4:]
account.label = label
account.currency = Account.get_currency(bal)
account.balance = -AmTr.decimal_amount(bal)
account.cardlimit = AmTr.decimal_amount(cardlimit)
account.paydate = datetime.datetime.strptime(paydate, '%m-%d-%Y')
account.paymin = AmTr.decimal_amount(paymin)
account.type = Account.TYPE_CARD
self.finish()
yield account
for innerId in self.to_accounts().inner_ids_dict().values():
yield self.to_account(innerId).account()
def iter_history(self, account):
for trans in chain(self.iter_history_recent(account),
self.iter_history_statements(account)):
innerId = account._innerId
for trans in self.to_account(innerId).transactions():
yield trans
for date in self.to_statements(innerId).dates():
for trans in self.to_statement(date).transactions():
yield trans
def start(self):
def to_account(self, innerId):
return self.to_page(self.accdetails, accountID=innerId)
def to_accounts(self):
return self.to_page(self.accounts, ttc='742')
def to_statements(self, innerId):
return self.to_page(self.statements, accountID=innerId)
def to_statement(self, date):
return self.to_page(self.statement, date=date)
@need_login
def to_page(self, url, **data):
return self.page if url.is_here(**data) else url.go(data=data, **data)
def do_login(self):
# To avoid ImportError during e.g. building modules list.
from selenium import webdriver
HOME_URL = 'https://online.citibank.com/US/JPS/portal/Home.do'
WIDTH = 1920
HEIGHT = 10000 # So that everything fits...
self._downloads = mkdtemp()
self._logger.debug('Saving downloaded files to %s' % self._downloads)
prof = webdriver.FirefoxProfile()
prof.set_preference('browser.download.folderList', 2)
prof.set_preference('browser.download.dir', self._downloads)
prof.set_preference('browser.helperApps.neverAsk.saveToDisk',
'application/pdf')
prof.set_preference('pdfjs.disabled', True)
self._browser = webdriver.Firefox(prof)
self._browser.set_window_size(WIDTH, HEIGHT)
self._browser.get('https://online.citibank.com')
self.wait('input[name="usernameMasked"]')[0].send_keys(self._username)
self.wait('input[name="password"]')[0].send_keys(self._password)
self.wait('form[name="SignonForm"]')[0].submit()
self._browser.get(HOME_URL)
if self._browser.current_url != HOME_URL:
browser = webdriver.Firefox()
browser.get(self.BASEURL)
browser.execute_script('''
$("input[name='username']").val("%s");
$("input[name='password']").val("%s");
''' % (self.username, self.password))
self.wait(browser, 'form[name="SignonForm"]')[0].submit()
self.session.cookies.clear()
for c in browser.get_cookies():
self.session.cookies.set(name=c['name'], value=unquote(c['value']))
browser.close()
prof_dir = browser.firefox_profile.profile_dir
browser = None
gc.collect() # Make sure Firefox process is dead.
for i in xrange(self.MAX_RETRIES):
try:
rmtree(prof_dir)
break
except OSError:
sleep(min(1 << i, self.MAX_DELAY))
if not self.home.go().logged:
raise BrowserIncorrectPassword()
def finish(self):
prof_dir = self._browser.firefox_profile.profile_dir
self._browser.close()
del self._browser
rmtree(self._downloads)
del self._downloads
rmtree(prof_dir)
def iter_history_recent(self, account):
self.start()
if account.id != self._account_id():
raise AccountNotFound()
self._account_link().click()
self.wait_ajax()
for span in self.find('span.cM-maximizeButton'):
span.click()
for tr in self.find('tr.payments,tr.purchase'):
trdata = lambda n: tr.find_element_by_css_selector(
'td.cT-bodyTableColumn%i span.cT-line1' % n).text
treid = tr.get_attribute('id').replace('rowID', 'rowIDExt')
tredata = {}
for tre in self.find('tr#%s' % treid):
labels = [x.text for x in tre.find_elements_by_css_selector(
'div.cT-labelItem')]
values = [x.text for x in tre.find_elements_by_css_selector(
'div.cT-valueItem')]
tredata = dict(zip(labels, values))
ref = tredata.get(u'Reference Number:', u'')
tdate = trdata(1)
pdate = tredata.get(u'Posted Date :', tdate)
desc = clean_label(trdata(2))
amount = trdata(4)
tdate = datetime.datetime.strptime(tdate, '%m-%d-%Y')
pdate = datetime.datetime.strptime(pdate, '%m-%d-%Y')
if amount.startswith(u'(') and amount.endswith(u')'):
amount = AmTr.decimal_amount(amount[1:-1])
else:
amount = -AmTr.decimal_amount(amount)
trans = Transaction(ref)
trans.date = tdate
trans.rdate = pdate
trans.type = Transaction.TYPE_UNKNOWN
trans.raw = desc
trans.label = desc
trans.amount = amount
yield trans
self.finish()
def iter_history_statements(self, account):
# To avoid ImportError during e.g. building modules list.
from selenium.webdriver.common.keys import Keys
from selenium.common.exceptions import MoveTargetOutOfBoundsException,\
ElementNotVisibleException
self.start()
if account.id != self._account_id():
raise AccountNotFound()
self.wait('a#cmlink_ViewPastStmtLink')[0].click()
opts = self.wait('option#currentStatementDateOptions')
for i, opt in enumerate(opts):
# We're interested only in finalized statements.
if u'Unbilled' in opt.get_attribute('value'):
continue
self.wait('div#currentStatementsDate-button')[0].click()
ul = self.wait('ul#currentStatementsDate-menu')[0]
while True:
try:
self.wait('li#currentStatementDateOptions span')[i].click()
break
except (MoveTargetOutOfBoundsException,
ElementNotVisibleException):
ul.send_keys(Keys.ARROW_DOWN)
self.wait('a#downloadCurrentStatements')[0].click()
pdfname = self.wait_file('.pdf')
pdfpath = os.path.join(self._downloads, pdfname)
with open(pdfpath, 'rb') as f:
parser = StatementParser(f.read())
os.remove(pdfpath)
# Transactions in a statement can go in different order.
ts = sorted(parser.read_transactions(),
cmp=lambda t1, t2: cmp(t2.date, t1.date))
for t in ts:
yield t
self.finish()
def find(self, selector):
self._logger.debug('Finding selector """%s""" on page %s' % (
selector, self._browser.current_url))
return self._browser.find_elements_by_css_selector(selector)
@retrying
def wait(self, selector):
els = self.find(selector)
if not els:
raise OnceAgain()
return els
@retrying
def wait_ajax(self):
from selenium.common.exceptions import StaleElementReferenceException
self._logger.debug('Waiting for async requests to finish on page %s'
% self._browser.current_url)
els = self._browser.find_elements_by_xpath(
u'//*[contains(text(),"Please wait")]')
try:
if not els or any(x.is_displayed() for x in els):
raise OnceAgain()
except StaleElementReferenceException:
raise OnceAgain()
@retrying
def wait_file(self, suffix):
self._logger.debug('Waiting for file "*%s" to finish downloading.' %
suffix)
for name in os.listdir(self._downloads):
if not name.endswith(suffix):
continue
path = os.path.join(self._downloads, name)
# Wait until file is not empty.
if not os.stat(path).st_size:
continue
# Wait until no processes are accessing the file.
if subprocess.call(['fuser', '-s', path]) == 0:
continue
return name
raise OnceAgain()
def _account_link(self):
return self.wait('a#cmlink_AccountNameLink')[0]
def _account_id(self):
return re.match('.*-([0-9]+)$', self._account_link().text).group(1)
def wait(self, browser, selector):
self.logger.debug('Finding selector """%s""" on page %s' % (
selector, browser.current_url))
for i in xrange(self.MAX_RETRIES):
els = browser.find_elements_by_css_selector(selector)
if els:
return els
sleep(min(1 << i, self.MAX_DELAY))
raise BrowserUnavailable('Unexpected site behavior. '
'Perhaps this module needs some fixing...')