Updates cmb, new features and bug fix

- fix a bug in the accounts listing
- adds proxy support
- adds ssl checks :
   - checks the certificate chain with the Verisign authority
   - checks that the sha256sum of the server's certificate is what's expected (like in Browser).
   - adds an option to avoid ssl checks (useful for debug)

Signed-off-by: Johann Broudin <Johann.Broudin@6-8.fr>
Signed-off-by: Romain Bignon <romain@symlink.me>
This commit is contained in:
Johann Broudin 2012-10-26 00:57:11 +02:00 committed by Romain Bignon
commit 654c2bafee
3 changed files with 206 additions and 17 deletions

View file

@ -0,0 +1,14 @@
-----BEGIN CERTIFICATE-----
MIICPDCCAaUCEHC65B0Q2Sk0tjjKewPMur8wDQYJKoZIhvcNAQECBQAwXzELMAkG
A1UEBhMCVVMxFzAVBgNVBAoTDlZlcmlTaWduLCBJbmMuMTcwNQYDVQQLEy5DbGFz
cyAzIFB1YmxpYyBQcmltYXJ5IENlcnRpZmljYXRpb24gQXV0aG9yaXR5MB4XDTk2
MDEyOTAwMDAwMFoXDTI4MDgwMTIzNTk1OVowXzELMAkGA1UEBhMCVVMxFzAVBgNV
BAoTDlZlcmlTaWduLCBJbmMuMTcwNQYDVQQLEy5DbGFzcyAzIFB1YmxpYyBQcmlt
YXJ5IENlcnRpZmljYXRpb24gQXV0aG9yaXR5MIGfMA0GCSqGSIb3DQEBAQUAA4GN
ADCBiQKBgQDJXFme8huKARS0EN8EQNvjV69qRUCPhAwL0TPZ2RHP7gJYHyX3KqhE
BarsAx94f56TuZoAqiN91qyFomNFx3InzPRMxnVx0jnvT0Lwdd8KkMaOIG+YD/is
I19wKTakyYbnsZogy1Olhec9vn2a/iRFM9x2Fe0PonFkTGUugWhFpwIDAQABMA0G
CSqGSIb3DQEBAgUAA4GBALtMEivPLCYATxQT3ab7/AoRhIzzKBxnki98tsX63/Do
lbwdj2wsqFHMc9ikwFPwTtYmwHYBV4GSXiHx0bH/59AhWM1pF+NEHJwZRDmJXNyc
AA9WjQKZ7aKQRUzkuxCkPfAyAw7xzvjoyVGM5mKf5p/AfbdynMk2OmufTqj/ZA1k
-----END CERTIFICATE-----

View file

@ -17,7 +17,6 @@
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.bank import ICapBank, AccountNotFound
from weboob.capabilities.bank import Account, Transaction
from weboob.tools.backend import BaseBackend, BackendConfig
@ -26,28 +25,41 @@ from weboob.capabilities.base import NotAvailable
from weboob.tools.browser import BrowserIncorrectPassword, BrokenPageError
from re import match, compile, sub
from httplib import HTTPSConnection
from urllib import urlencode
from decimal import Decimal
from lxml import etree
from datetime import date
from StringIO import StringIO
from ssl import DER_cert_to_PEM_cert
from hashlib import sha256
import os
# import a library that adds certificate verification and proxy support to
# HTTPSConnection
from hellhttp import HellHTTPS
__all__ = ['CmbBackend']
class CmbBackend(BaseBackend, ICapBank):
NAME = 'cmb'
MAINTAINER = u'Johann Broudin'
EMAIL = 'Johann.Broudin@6-8.fr'
VERSION = '0.d'
LICENSE = 'AGPLv3+'
AUTH_CERT = os.path.dirname(__file__)
AUTH_CERT += '/Verisign_Class_3_Public_Primary_Certification_Authority.pem'
CERTHASH = '684d79eb02f59497b5a9c5dcc4c26db1ee637db12f29d703fdf6a80aafef892d'
DESCRIPTION = u'Crédit Mutuel de Bretagne French bank website'
CONFIG = BackendConfig(
ValueBackendPassword('login', label='Account ID', masked=False),
ValueBackendPassword('password', label='Password', masked=True))
ValueBackendPassword('password', label='Password', masked=True),
ValueBackendPassword('no_check', label='SSL Check ? [y,n]', masked=False))
LABEL_PATTERNS = [
( # card
compile('^CARTE (?P<text>.*)'),
@ -86,6 +98,8 @@ class CmbBackend(BaseBackend, ICapBank):
)
]
cookie = None
headers = {
'User-Agent':
@ -93,6 +107,11 @@ class CmbBackend(BaseBackend, ICapBank):
'AppleWebKit/531.21.10 (KHTML, like Gecko) Mobile/7B405'
}
def sslCallBack(self, cert):
pemcert = DER_cert_to_PEM_cert(cert)
certhash = sha256(pemcert).hexdigest()
return certhash == self.CERTHASH
def login(self):
params = urlencode({
'codeEspace': 'NO',
@ -101,13 +120,18 @@ class CmbBackend(BaseBackend, ICapBank):
'noPersonne': self.config['login'].get(),
'motDePasse': self.config['password'].get()
})
conn = HTTPSConnection("www.cmb.fr")
if 'no_check' in self.config and self.config['no_check'].get() == "y":
conn = HellHTTPS("www.cmb.fr")
else:
conn = HellHTTPS("www.cmb.fr", ca_file=self.AUTH_CERT, callBack=self.sslCallBack)
conn.connect()
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
conn.request("POST",
"/domiweb/servlet/Identification",
params,
headers)
response = conn.getresponse()
conn.close()
if response.status == 302:
self.cookie = response.getheader('Set-Cookie').split(';')[0]
self.cookie += ';'
@ -121,7 +145,11 @@ class CmbBackend(BaseBackend, ICapBank):
self.login()
def do_http():
conn = HTTPSConnection("www.cmb.fr")
if 'no_check' in self.config and self.config['no_check'].get() == "y":
conn = HellHTTPS("www.cmb.fr")
else:
conn = HellHTTPS("www.cmb.fr", ca_file=self.AUTH_CERT, callBack=self.sslCallBack)
conn.connect()
headers = self.headers
headers['Cookie'] = self.cookie
conn.request("GET",
@ -157,22 +185,22 @@ class CmbBackend(BaseBackend, ICapBank):
account = Account()
td = tr.xpath('td')
a = td[0].xpath('a')
a = td[1].xpath('a')
account.label = unicode(a[0].text).strip()
href = a[0].get('href')
m = match(r"javascript:releve\((.*),'(.*)','(.*)'\)",
href)
account.id = unicode(m.group(1) + m.group(2) + m.group(3))
account.cmbvaleur = m.group(1)
account.cmbvaleur2 = m.group(2)
account.cmbtype = m.group(3)
account._cmbvaleur = m.group(1)
account._cmbvaleur2 = m.group(2)
account._cmbtype = m.group(3)
balance = td[1].text
balance = td[2].text
balance = balance.replace(',', '.').replace(u"\xa0", '')
account.balance = Decimal(balance)
span = td[3].xpath('a/span')
span = td[4].xpath('a/span')
if len(span):
coming = span[0].text.replace(' ', '').replace(',', '.')
coming = coming.replace(u"\xa0", '')
@ -194,18 +222,22 @@ class CmbBackend(BaseBackend, ICapBank):
self.login()
page = "/domiweb/prive/particulier/releve/"
if account.cmbtype == 'D':
if account._cmbtype == 'D':
page += "10-releve.act"
else:
page += "2-releve.act"
page +="?noPageReleve=1&indiceCompte="
page += account.cmbvaleur
page += account._cmbvaleur
page += "&typeCompte="
page += account.cmbvaleur2
page += account._cmbvaleur2
page += "&deviseOrigineEcran=EUR"
def do_http():
conn = HTTPSConnection("www.cmb.fr")
if 'no_check' in self.config and self.config['no_check'].get() == "y":
conn = HellHTTPS("www.cmb.fr")
else:
conn = HellHTTPS("www.cmb.fr", ca_file=self.AUTH_CERT, callBack=self.sslCallBack)
conn.connect()
headers = self.headers
headers['Cookie'] = self.cookie
conn.request("GET", page, {}, headers)
@ -256,7 +288,8 @@ class CmbBackend(BaseBackend, ICapBank):
mm = pattern.match(operation.raw)
if mm:
operation.type = _type
operation.label = sub('[ ]+', ' ', _label % mm.groupdict()).strip()
operation.label = sub('[ ]+', ' ',
_label % mm.groupdict()).strip()
break
amount = td[3].text

142
modules/cmb/hellhttp.py Normal file
View file

@ -0,0 +1,142 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2012 Johann Broudin
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import httplib
import socket
import ssl
import hashlib
from urlparse import urlsplit
__all__ = ['HTTPSVerifiedConnection', 'HellHTTPS']
PROXY_PORT = 8080
class HTTPSVerifiedConnection(httplib.HTTPSConnection):
"""
This class allows communication via SSL, and will checks certificates
"""
def __init__(self, host, port=None, key_file=None, cert_file=None,
ca_file=None, strict=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
callBack=None):
httplib.HTTPSConnection.__init__(self, host, port, key_file,
cert_file, strict, timeout)
self.ca_file = ca_file
self.callBack = callBack
self.certificate = None
def connect(self):
"""
Connect to a host on a given port and check the certificate
This is almost the same than the conect of HTTPSConnection, but adds
some function for SSL certificate verification
"""
sock = socket.create_connection((self.host, self.port), self.timeout)
if self._tunnel_host:
self.sock = sock
self._tunnel()
if self.ca_file:
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
ca_certs = self.ca_file,
cert_reqs=ssl.CERT_REQUIRED)
else:
self.sock = ssl.wrap_socket(sock,
self.key_file,
self.cert_file,
cert_reqs=ssl.CERT_NONE)
self.certificate = self.sock.getpeercert(True)
if self.callBack:
if not self.callBack(self.certificate):
raise ssl.SSLError(1, "Call back verification failed")
class HellHTTPS:
"This class is the library used by the weboob's CMB module"
def __init__(self, host, port=None, proxy=None, proxy_port=None,
key_file=None, cert_file=None, ca_file=None, strict=None,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT, callBack=None):
self.proxy = proxy
self.proxy_port = proxy_port
if not self.proxy:
import os
if 'http_proxy' in os.environ:
o = urlsplit(os.environ['http_proxy'])
self.proxy = o.hostname
if o.port:
self.proxy_port = o.port
else:
self.proxy_port = PROXY_PORT
self.host = host
self.port = port
if self.proxy:
if self.proxy_port:
pport = self.proxy_port
else:
pport = PROXY_PORT
self.conn = HTTPSVerifiedConnection(proxy, pport, key_file,
cert_file, ca_file, strict, timeout, callBack)
else:
self.conn = HTTPSVerifiedConnection(host, port, key_file, cert_file,
ca_file, strict, timeout, callBack)
def request(self, *args, **kwargs):
self.conn.request(*args, **kwargs)
def connect(self):
# set the proxy
# python 2.6 needs _set_tunnel, 2.7 needs set_tunnel
if self.proxy:
self.conn._set_tunnel(self.host, self.port)
self.conn.connect()
def getresponse(self, *args):
return self.conn.getresponse(*args)
def close(self):
self.conn.close
# A script to find the hash that has to be used in the call back function
if __name__ == '__main__':
import sys
if len(sys.argv) > 5 or len(sys.argv) < 1:
print 'usage: python %s host [port [proxy [proxy_port]]]' % sys.argv[0]
sys.exit(1)
conn = HellHTTPS(*sys.argv[1:])
conn.connect()
conn.request('GET', '/')
response = conn.getresponse()
print response.status, response.reason
pemcert = ssl.DER_cert_to_PEM_cert(conn.conn.certificate)
certhash = hashlib.sha256(pemcert).hexdigest()
print "Hash: %s" % certhash
conn.close()