port every console applications to ReplApplication

This commit is contained in:
Romain Bignon 2010-09-24 12:58:27 -04:00
commit 37a53b783a
17 changed files with 460 additions and 369 deletions

View file

@ -1,6 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2009-2010 Romain Bignon, Christophe Benz
#
@ -39,6 +37,8 @@ class Boobank(ReplApplication):
def do_list(self, line):
"""
list
List every available accounts.
"""
tot_balance = 0.0
@ -61,6 +61,8 @@ class Boobank(ReplApplication):
def do_history(self, id):
"""
history ID
Display old operations.
"""
id, backend_name = self.parse_id(id)
@ -76,6 +78,8 @@ class Boobank(ReplApplication):
def do_coming(self, id):
"""
coming ID
Display all future operations.
"""
id, backend_name = self.parse_id(id)
@ -88,25 +92,22 @@ class Boobank(ReplApplication):
for backend, operation in self.do(do):
self.format(operation)
def do_transfer(self, arg):
def do_transfer(self, line):
"""
transfer FROM TO AMOUNT
Make a transfer beetwen two account
"""
id_from , id_to, amount = arg.split()
id_from, id_to, amount = self.parseline(line, 3, 3)
id_from, backend_name = self.parse_id(id_from)
id_to, backend_name = self.parse_id(id_to)
names = (backend_name,) if backend_name is not None else None
self.load_backends(ICapBank, names=names)
def do(backend):
return backend.transfer(id_from, id_to, amount)
return backend.transfer(id_from, id_to, float(amount))
for backend, operation in self.do(do):
pass

View file

@ -18,7 +18,7 @@
import logging
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
from weboob.capabilities.chat import ICapChat
from weboob.capabilities.contact import ICapContact, Contact
@ -26,36 +26,42 @@ from weboob.capabilities.contact import ICapContact, Contact
__all__ = ['Chatoob']
class Chatoob(ConsoleApplication):
class Chatoob(ReplApplication):
APPNAME = 'chatoob'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Christophe Benz'
def main(self, argv):
def load_default_backends(self):
self.load_backends(ICapChat)
#for backend, result in self.do('start_chat_polling', self.on_new_chat_message):
#logging.info(u'Polling chat messages for backend %s' % backend)
return self.process_command(*argv[1:])
def on_new_chat_message(self, message):
print 'on_new_chat_message: %s' % message
@ConsoleApplication.command('exit program')
def command_exit(self):
self.weboob.want_stop()
def do_list(self, line):
"""
list
@ConsoleApplication.command('list online contacts')
def command_list(self):
List all contacts.
"""
for backend, contact in self.do('iter_contacts', status=Contact.STATUS_ONLINE, caps=ICapContact):
self.format(contact)
@ConsoleApplication.command('get messages')
def command_messages(self):
def do_messages(self, line):
"""
messages
Get messages.
"""
for backend, message in self.do('iter_chat_messages'):
self.format(message)
@ConsoleApplication.command('send message to contact')
def command_send(self, _id, message):
def do_send(self, line):
"""
send CONTACT MESSAGE
Send a message to the specified contact.
"""
_id, message = self.parseline(line, 2, 2)
for backend, result in self.do('send_chat_message', _id, message):
if not result:
logging.error(u'Failed to send message to contact id="%s" on backend "%s"' % (_id, backend.name))

View file

@ -19,23 +19,25 @@
import sys
from weboob.capabilities.geolocip import ICapGeolocIp
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['Geolooc']
class Geolooc(ConsoleApplication):
class Geolooc(ReplApplication):
APPNAME = 'geolooc'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
def load_default_backends(self):
self.load_backends(ICapGeolocIp)
def main(self, argv):
if len(argv) < 2:
print >>sys.stderr, 'Syntax: %s ipaddr' % argv[0]
return 1
self.load_backends(ICapGeolocIp)
for backend, location in self.do('get_location', argv[1]):
self.format(location)

View file

@ -22,23 +22,25 @@ import logging
import sys
import weboob
from weboob.tools.application.prompt import PromptApplication
from weboob.tools.application.repl import ReplApplication
from weboob.capabilities.dating import ICapDating, OptimizationNotFound
__all__ = ['HaveSex']
class HaveSex(PromptApplication):
class HaveSex(ReplApplication):
APPNAME = 'havesex'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
STORAGE_FILENAME = 'dating.storage'
CONFIG = {'optimizations': ''}
def load_default_backends(self):
self.load_backends(ICapDating, storage=self.create_storage(self.STORAGE_FILENAME))
def main(self, argv):
self.load_config()
self.load_backends(ICapDating, storage=self.create_storage(self.STORAGE_FILENAME))
self.do('init_optimizations').wait()
@ -48,15 +50,14 @@ class HaveSex(PromptApplication):
if optimizations_list:
self.optims('Starting', 'start_optimization', optimizations_list)
return self.loop()
return ReplApplication.main(self, argv)
@PromptApplication.command("exit program")
def command_exit(self):
print 'Returning in real-life...'
self.weboob.want_stop()
def do_profile(self, id):
"""
profile ID
@PromptApplication.command("show a profile")
def command_profile(self, id):
Display a profile
"""
_id, backend_name = self.parse_id(id)
def print_node(node, level=1):
@ -116,10 +117,18 @@ class HaveSex(PromptApplication):
if isinstance(error, OptimizationNotFound):
logging.error(u'Optimization "%s" not found' % optim)
@PromptApplication.command("start optimizations")
def command_start(self, *optims):
def do_start(self, *optims):
"""
start OPTIMIZATION [OPTIMIZATION [...]]
Start optimization services.
"""
self.optims('Starting', 'start_optimization', optims)
@PromptApplication.command("stop optimizations")
def command_stop(self, *optims):
"""
stop OPTIMIZATION [OPTIMIZATION [...]]
Stop optimization services.
"""
self.optims('Stopping', 'stop_optimization', optims)

View file

@ -31,7 +31,7 @@ import asyncore
from weboob.core.ouiboube import Weboob
from weboob.core.scheduler import Scheduler
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Thread, Message
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
from weboob.tools.misc import html2text, get_backtrace, utc2local
@ -75,7 +75,7 @@ class MonboobScheduler(Scheduler):
return True
class Monboob(ConsoleApplication):
class Monboob(ReplApplication):
APPNAME = 'monboob'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
@ -91,11 +91,13 @@ class Monboob(ConsoleApplication):
def create_weboob(self):
return Weboob(scheduler=MonboobScheduler(self))
def main(self, argv):
self.load_config()
def load_default_backends(self):
self.load_backends(ICapMessages, storage=self.create_storage())
return self.process_command(*argv[1:])
def main(self, argv):
self.load_config()
return self.onecmd(' '.join(argv[1:]))
def get_email_address_ident(self, msg, header):
s = msg.get(header)
@ -108,8 +110,12 @@ class Monboob(ConsoleApplication):
except IndexError:
return s
@ConsoleApplication.command("pipe with a mail to post message")
def command_post(self):
def do_post(self):
"""
post
Pipe with a mail to post message.
"""
msg = message_from_file(sys.stdin)
return self.process_incoming_mail(msg)
@ -185,8 +191,12 @@ class Monboob(ConsoleApplication):
parent=Message(thread, msg_id),
content=content))
@ConsoleApplication.command("run daemon")
def command_run(self):
def do_run(self):
"""
run
Run the fetching daemon.
"""
self.weboob.repeat(int(self.config.get('interval')), self.process)
self.weboob.loop()

View file

@ -19,28 +19,37 @@
import logging
from weboob.capabilities.travel import ICapTravel
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['Traveloob']
class Traveloob(ConsoleApplication):
class Traveloob(ReplApplication):
APPNAME = 'traveloob'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
def main(self, argv):
return self.process_command(*argv[1:])
@ConsoleApplication.command('Search stations')
def command_stations(self, pattern):
def load_default_backends(self):
self.load_backends(ICapTravel)
def do_stations(self, pattern):
"""
search PATTERN
Search stations.
"""
for backend, station in self.do('iter_station_search', pattern):
self.format(station)
@ConsoleApplication.command('List all departures for a given station')
def command_departures(self, station, arrival=None):
def do_departures(self, line):
"""
departures STATION [ARRIVAL]
List all departures for a given station.
"""
station, arrival = self.parseline(line, 2, 1)
station_id, backend_name = self.parse_id(station)
if arrival:
arrival_id, backend_name2 = self.parse_id(arrival)
@ -57,6 +66,5 @@ class Traveloob(ConsoleApplication):
else:
backends = None
self.load_backends(ICapTravel, names=backends)
for backend, departure in self.do('iter_station_departures', station_id, arrival_id):
for backend, departure in self.do('iter_station_departures', station_id, arrival_id, backends=backends):
self.format(departure)

View file

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010 Christophe Benz, Romain Bignon
#
# This program is free software; you can redistribute it and/or modify
@ -55,7 +54,7 @@ class Videoob(ReplApplication):
for backend, video in self.do('get_video', _id, backends=backend_names):
self.format(video)
self.flush()
def do_nsfw(self, line):
"""
nsfw [on | off]

View file

@ -22,21 +22,18 @@ import os
import subprocess
import re
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
from weboob.tools.ordereddict import OrderedDict
__all__ = ['WeboobCfg']
class WeboobCfg(ConsoleApplication):
class WeboobCfg(ReplApplication):
APPNAME = 'weboob-config'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Christophe Benz, Romain Bignon'
def main(self, argv):
return self.process_command(*argv[1:])
def caps_included(self, modcaps, caps):
modcaps = [x.__name__ for x in modcaps]
for cap in caps:
@ -44,8 +41,18 @@ class WeboobCfg(ConsoleApplication):
return False
return True
@ConsoleApplication.command('Add a configured backend')
def command_add(self, name, *options):
def do_add(self, line):
"""
add NAME [OPTIONS ...]
Add a configured backend.
"""
name, options = self.parseline(line, 2, 1)
if options:
options = options.split(' ')
else:
options = ()
self.weboob.modules_loader.load_all()
if name not in [_name for _name, backend in self.weboob.modules_loader.loaded.iteritems()]:
logging.error(u'Backend "%s" does not exist.' % name)
@ -101,8 +108,12 @@ class WeboobCfg(ConsoleApplication):
except ConfigParser.DuplicateSectionError:
print 'Instance "%s" already exists for backend "%s".' % (new_name, name)
@ConsoleApplication.command('Show configured backends')
def command_listconfigured(self):
def do_listconfigured(self):
"""
list
Show configured backends.
"""
self.set_default_formatter('table')
for instance_name, name, params in sorted(self.weboob.backends_config.iter_backends()):
backend = self.weboob.modules_loader.get_or_load_module(name)
@ -112,20 +123,33 @@ class WeboobCfg(ConsoleApplication):
])
self.format(row)
@ConsoleApplication.command('Remove a configured backend')
def command_remove(self, instance_name):
def do_remove(self, instance_name):
"""
remove NAME
Remove a configured backend.
"""
try:
self.weboob.backends_config.remove_backend(instance_name)
except ConfigParser.NoSectionError:
logging.error('Backend instance "%s" does not exist' % instance_name)
return 1
@ConsoleApplication.command('Edit configuration file')
def command_edit(self):
def do_edit(self):
"""
edit
Edit configuration file.
"""
subprocess.call([os.environ.get('EDITOR', 'vi'), self.weboob.backends_config.confpath])
@ConsoleApplication.command('Show available backends')
def command_backends(self, *caps):
def do_backends(self, line):
"""
backends [CAPS ...]
Show available backends
"""
caps = line.split(' ')
self.set_default_formatter('table')
self.weboob.modules_loader.load_all()
for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()):
@ -137,8 +161,12 @@ class WeboobCfg(ConsoleApplication):
])
self.format(row)
@ConsoleApplication.command('Display information about a backend')
def command_info(self, name):
def do_info(self, name):
"""
info NAME
Display information about a backend.
"""
try:
backend = self.weboob.modules_loader.get_or_load_module(name)
except KeyError:
@ -166,8 +194,12 @@ class WeboobCfg(ConsoleApplication):
print '| | %s: %s' % (key, value)
print "'-----------------'"
@ConsoleApplication.command('Show applications')
def command_applications(self, *caps):
def do_applications(self, line):
"""
applications
Show applications.
"""
applications = set()
import weboob.applications
for path in weboob.applications.__path__:

View file

@ -18,17 +18,20 @@
import sys
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['WeboobCli']
class WeboobCli(ConsoleApplication):
class WeboobCli(ReplApplication):
APPNAME = 'weboob-cli'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
def load_default_backends(self):
pass
def main(self, argv):
if len(argv) < 3:
print >>sys.stderr, "Syntax: %s capability command [args ..]" % argv[0]

View file

@ -18,19 +18,23 @@
import logging
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
class WeboobDebug(ConsoleApplication):
class WeboobDebug(ReplApplication):
APPNAME = 'weboobdebug'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Christophe Benz'
def main(self, argv):
return self.process_command(*argv[1:])
def load_default_backends(self):
pass
@ConsoleApplication.command('Debug backend')
def command_shell(self, backend_name):
def do_shell(self, backend_name):
"""
shell BACKEND
Debug a backend.
"""
try:
backend = self.weboob.load_backends(names=[backend_name])[backend_name]
except KeyError:

View file

@ -20,23 +20,26 @@ from __future__ import with_statement
import sys
from weboob.capabilities.torrent import ICapTorrent
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['Weboorrents']
class Weboorrents(ConsoleApplication):
class Weboorrents(ReplApplication):
APPNAME = 'weboorrents'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
def main(self, argv):
def load_default_backends(self):
self.load_backends(ICapTorrent)
return self.process_command(*argv[1:])
@ConsoleApplication.command('Get information about a torrent')
def command_info(self, id):
def do_info(self, id):
"""
info ID
Get information about a torrent.
"""
_id, backend_name = self.parse_id(id)
found = 0
@ -48,8 +51,16 @@ class Weboorrents(ConsoleApplication):
if not found:
print >>sys.stderr, 'Torrent "%s" not found' % id
@ConsoleApplication.command('Get the torrent file')
def command_getfile(self, id, dest):
def do_getfile(self, line):
"""
getfile ID FILENAME
Get the .torrent file.
FILENAME is where to write the file. If FILENAME is '-',
the file is written to stdout.
"""
id, dest = self.parseline(line, 2, 2)
_id, backend_name = self.parse_id(id)
for backend, buf in self.do('get_torrent_file', _id, backends=backend_name):
@ -63,8 +74,14 @@ class Weboorrents(ConsoleApplication):
print >>sys.stderr, 'Torrent "%s" not found' % id
@ConsoleApplication.command('Search torrents')
def command_search(self, pattern=None):
def do_search(self, pattern):
"""
search [PATTERN]
Search torrents.
"""
if not pattern:
pattern = None
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'Latest torrents')
for backend, torrent in self.do('iter_torrents', pattern=pattern):
self.format(torrent)

View file

@ -16,51 +16,61 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import logging
import sys
from weboob.core import CallErrors
from weboob.capabilities.weather import ICapWeather, CityNotFound
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['WetBoobs']
class WetBoobs(ConsoleApplication):
class WetBoobs(ReplApplication):
APPNAME = 'wetboobs'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
def main(self, argv):
def load_default_backends(self):
self.load_backends(ICapWeather)
return self.process_command(*argv[1:])
def do_search(self, pattern):
"""
search PATTERN
@ConsoleApplication.command('search cities')
def command_search(self, pattern):
Search cities.
"""
for backend, city in self.do('iter_city_search', pattern):
self.format(city)
@ConsoleApplication.command('get current weather')
def command_current(self, city):
def do_current(self, city):
"""
current CITY
Get current weather.
"""
try:
for backend, current in self.do('get_current', city):
self.format(current)
except CallErrors, e:
for error in e:
if isinstance(error, CityNotFound):
logging.error('City "%s" not found' % city)
print >>sys.stderr, 'City "%s" not found' % city
else:
raise error
@ConsoleApplication.command('get forecasts')
def command_forecasts(self, city):
def do_forecasts(self, city):
"""
forecasts CITY
Get forecasts.
"""
try:
for backend, forecast in self.do('iter_forecast', city):
self.format(forecast)
except CallErrors, e:
for error in e:
if isinstance(error, CityNotFound):
logging.error('City "%s" not found' % city)
print >>sys.stderr, 'City "%s" not found' % city
else:
raise error

View file

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
#
# backend.py
#
# Copyright 2010 nicolas <nicolas@jombi.fr>
#
# backend.py
#
# Copyright 2010 nicolas <nicolas@jombi.fr>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
@ -23,44 +23,38 @@ from weboob.tools.backend import BaseBackend
from .browser import BPbrowser
class BPBackend(BaseBackend, ICapBank):
NAME = 'bp'
MAINTAINER = 'Nicolas Duhamel'
EMAIL = 'nicolas@jombi.fr'
VERSION = '0.1'
LICENSE = 'GPLv3'
DESCRIPTION = 'La banque postale, banque française'
CONFIG = {'login': BaseBackend.ConfigField(description='Account ID'),
'password': BaseBackend.ConfigField(description='Password of account', is_masked=True)
}
BROWSER = BPbrowser
def create_default_browser(self):
return self.create_browser(self.config['login'], self.config['password'])
NAME = 'bp'
MAINTAINER = 'Nicolas Duhamel'
EMAIL = 'nicolas@jombi.fr'
VERSION = '0.1'
LICENSE = 'GPLv3'
DESCRIPTION = 'La banque postale, banque française'
CONFIG = {'login': BaseBackend.ConfigField(description='Account ID'),
'password': BaseBackend.ConfigField(description='Password of account', is_masked=True)
}
BROWSER = BPbrowser
def iter_accounts(self):
for account in self.browser.get_accounts_list():
yield account
def get_account(self, _id):
account = self.browser.get_account(_id)
if account:
return account
else:
raise AccountNotFound()
def iter_history(self, account):
for history in self.browser.get_history(account):
yield history
def transfer(self, id_from, id_to, amount):
from_account = self.get_account(id_from)
to_account = self.get_account(id_to)
#TODO: retourner le numero du virement
self.browser.make_transfer(from_account, to_account, amount)
def create_default_browser(self):
return self.create_browser(self.config['login'], self.config['password'])
def iter_accounts(self):
for account in self.browser.get_accounts_list():
yield account
def get_account(self, _id):
account = self.browser.get_account(_id)
if account:
return account
else:
raise AccountNotFound()
def iter_history(self, account):
for history in self.browser.get_history(account):
yield history
def transfer(self, id_from, id_to, amount):
from_account = self.get_account(id_from)
to_account = self.get_account(id_to)
#TODO: retourner le numero du virement
self.browser.make_transfer(from_account, to_account, amount)

View file

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
#
# browser.py
#
#
# Copyright 2010 nicolas <nicolas@NicolasDesktop>
#
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
@ -22,7 +22,6 @@ import mechanize
import hashlib
import re
from weboob.tools.browser import BrowserIncorrectPassword
from weboob.tools.parsers import get_parser
from weboob.capabilities.bank import Account
from weboob.capabilities.bank import Operation
@ -30,187 +29,172 @@ from weboob.capabilities.bank import Operation
def remove_html_tags(data):
p = re.compile(r'<.*?>')
return p.sub(' ', data)
def remove_extra_spaces(data):
p = re.compile(r'\s+')
return p.sub(' ', data)
LOCAL_HASH = ['a02574d7bf67677d2a86b7bfc5e864fe', 'eb85e1cc45dd6bdb3cab65c002d7ac8a',
'596e6fbd54d5b111fe5df8a4948e80a4', '9cdc989a4310554e7f5484d0d27a86ce', '0183943de6c0e331f3b9fc49c704ac6d',
'291b9987225193ab1347301b241e2187', '163279f1a46082408613d12394e4042a', 'b0a9c740c4cada01eb691b4acda4daea',
LOCAL_HASH = ['a02574d7bf67677d2a86b7bfc5e864fe', 'eb85e1cc45dd6bdb3cab65c002d7ac8a',
'596e6fbd54d5b111fe5df8a4948e80a4', '9cdc989a4310554e7f5484d0d27a86ce', '0183943de6c0e331f3b9fc49c704ac6d',
'291b9987225193ab1347301b241e2187', '163279f1a46082408613d12394e4042a', 'b0a9c740c4cada01eb691b4acda4daea',
'3c4307ee92a1f3b571a3c542eafcb330', 'c5b854ae314b61ba42948909e0b2eae7']
ENCODING = 'utf-8'
class BPbrowser(object):
def __init__(self, login, pwd):
self.is_logged = False
self.login_id = login
self.pwd = pwd
self.parser = get_parser()()
self.Browser = mechanize.Browser()
self.Browser.set_handle_robots(False)
self.Account_List = []
def login(self):
def md5(file):
f = open(file,'rb')
md5 = hashlib.md5()
md5.update(f.read())
return md5.hexdigest()
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/wsost/OstBrokerWeb/loginform?TAM_OP=login&ERROR_CODE=0x00000000&URL=%2Fvoscomptes%2FcanalXHTML%2Fidentif.ea%3Forigin%3Dparticuliers")
process = lambda i: md5(self.Browser.retrieve(("https://voscomptesenligne.labanquepostale.fr/wsost/OstBrokerWeb/loginform?imgid=%d&0.25122230781963073" % i ))[0])
Keypad = [ process(i) for i in range(10)]
correspondance = [ Keypad.index(i) for i in LOCAL_HASH]
Newpassword = "".join([str(correspondance[int(c)]) for c in self.pwd])
self.Browser.select_form(name="formAccesCompte")
self.Browser.find_control("password").readonly = False
self.Browser["password"] = Newpassword
self.Browser["username"] = self.login_id
self.Browser.submit()
self.is_logged = True
def get_accounts_list(self):
if self.Account_List:
return self.Account_List
if not self.is_logged:
self.login()
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/authentification/liste_contrat_atos.ea")
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/releve/liste_comptes.jsp")
document = self.parser.parse(self.Browser.response(), ENCODING)
#Parse CCP
compte_table = document.xpath("//table[@id='comptes']", smart_strings=False)[0]
compte_ligne = compte_table.xpath("./tbody/tr")
for compte in compte_ligne:
account = Account()
tp = compte.xpath("./td/a")[0]
account.label = tp.text
account.link_id = tp.get("href")
account.id = compte.xpath("./td")[1].text
account.balance = ''.join( compte.xpath("./td/span")[0].text.replace('.','').replace(',','.').split() )
self.Account_List.append(account)
#Parse epargne
epargne_table = document.xpath("//table[@id='comptesEpargne']", smart_strings=False)[0]
epargne_ligne = epargne_table.xpath("./tbody/tr")
for epargne in epargne_ligne:
account = Account()
tp = epargne.xpath("./td/a")[0]
account.label = tp.text
account.link_id = tp.get("href")
account.id = epargne.xpath("./td")[1].text
account.balance = ''.join( epargne.xpath("./td/span")[0].text.replace('.','').replace(',','.').split() )
self.Account_List.append(account)
return self.Account_List
def get_account(self, id):
if self.Account_List:
for account in self.Account_List:
if account.id == id:
return account
return None
self.get_accounts_list()
for account in self.Account_List:
if account.id == id:
return account
return None
def get_history(self, account):
self.Browser.open(account.link_id)
rep = self.Browser.follow_link(url_regex="releve", tag="a")
document = self.parser.parse(rep, ENCODING)
mvt_table = document.xpath("//table[@id='mouvements']", smart_strings=False)[0]
mvt_ligne = mvt_table.xpath("./tbody/tr")
operations = []
for mvt in mvt_ligne:
operation = Operation(len(operations))
operation.date = mvt.xpath("./td")[0].text
tp = mvt.xpath("./td")[1]
operation.label = remove_extra_spaces(remove_html_tags(self.parser.tostring(tp)))
r = re.compile(r'\d+')
tp = mvt.xpath("./td/span")
amount = None
for t in tp:
if r.search(t.text):
amount = t.text
amount = ''.join( amount.replace('.', '').replace(',', '.').split() )
if amount[0] == "-":
operation.amount = -float(amount[1:])
else:
operation.amount = float(amount)
operations.append(operation)
return operations
def make_transfer(self, from_account, to_account, amount):
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/f_virementSafran.jsp?n=11")
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/virementsafran/aiguillage/saisieComptes.ea")
self.Browser.select_form(name="AiguillageForm")
self.Browser["idxCompteEmetteur"] = [from_account.id]
self.Browser["idxCompteReceveur"] = [to_account.id]
self.Browser.submit()
self.Browser.select_form(name="VirementNationalForm")
self.Browser["montant"] = str(amount)
self.Browser.submit()
#Confirmation
# TODO: verifier que tout c'est bien passe
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/virementsafran/virementnational/4-virementNational.ea")
def __init__(self, login, pwd):
self.is_logged = False
self.login_id = login
self.pwd = pwd
self.parser = get_parser()()
self.Browser = mechanize.Browser()
self.Browser.set_handle_robots(False)
self.Account_List = []
def login(self):
def md5(file):
f = open(file,'rb')
md5 = hashlib.md5()
md5.update(f.read())
return md5.hexdigest()
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/wsost/OstBrokerWeb/loginform?TAM_OP=login&ERROR_CODE=0x00000000&URL=%2Fvoscomptes%2FcanalXHTML%2Fidentif.ea%3Forigin%3Dparticuliers")
process = lambda i: md5(self.Browser.retrieve(("https://voscomptesenligne.labanquepostale.fr/wsost/OstBrokerWeb/loginform?imgid=%d&0.25122230781963073" % i ))[0])
Keypad = [ process(i) for i in range(10)]
correspondance = [ Keypad.index(i) for i in LOCAL_HASH]
Newpassword = "".join([str(correspondance[int(c)]) for c in self.pwd])
self.Browser.select_form(name="formAccesCompte")
self.Browser.find_control("password").readonly = False
self.Browser["password"] = Newpassword
self.Browser["username"] = self.login_id
self.Browser.submit()
self.is_logged = True
def get_accounts_list(self):
if self.Account_List:
return self.Account_List
if not self.is_logged:
self.login()
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/authentification/liste_contrat_atos.ea")
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/releve/liste_comptes.jsp")
document = self.parser.parse(self.Browser.response(), ENCODING)
#Parse CCP
compte_table = document.xpath("//table[@id='comptes']", smart_strings=False)[0]
compte_ligne = compte_table.xpath("./tbody/tr")
for compte in compte_ligne:
account = Account()
tp = compte.xpath("./td/a")[0]
account.label = tp.text
account.link_id = tp.get("href")
account.id = compte.xpath("./td")[1].text
account.balance = ''.join( compte.xpath("./td/span")[0].text.replace('.','').replace(',','.').split() )
self.Account_List.append(account)
#Parse epargne
epargne_table = document.xpath("//table[@id='comptesEpargne']", smart_strings=False)[0]
epargne_ligne = epargne_table.xpath("./tbody/tr")
for epargne in epargne_ligne:
account = Account()
tp = epargne.xpath("./td/a")[0]
account.label = tp.text
account.link_id = tp.get("href")
account.id = epargne.xpath("./td")[1].text
account.balance = ''.join( epargne.xpath("./td/span")[0].text.replace('.','').replace(',','.').split() )
self.Account_List.append(account)
return self.Account_List
def get_account(self, id):
if self.Account_List:
for account in self.Account_List:
if account.id == id:
return account
return None
self.get_accounts_list()
for account in self.Account_List:
if account.id == id:
return account
return None
def get_history(self, account):
self.Browser.open(account.link_id)
rep = self.Browser.follow_link(url_regex="releve", tag="a")
document = self.parser.parse(rep, ENCODING)
mvt_table = document.xpath("//table[@id='mouvements']", smart_strings=False)[0]
mvt_ligne = mvt_table.xpath("./tbody/tr")
operations = []
for mvt in mvt_ligne:
operation = Operation(len(operations))
operation.date = mvt.xpath("./td")[0].text
tp = mvt.xpath("./td")[1]
operation.label = remove_extra_spaces(remove_html_tags(self.parser.tostring(tp)))
r = re.compile(r'\d+')
tp = mvt.xpath("./td/span")
amount = None
for t in tp:
if r.search(t.text):
amount = t.text
amount = ''.join( amount.replace('.', '').replace(',', '.').split() )
if amount[0] == "-":
operation.amount = -float(amount[1:])
else:
operation.amount = float(amount)
operations.append(operation)
return operations
def make_transfer(self, from_account, to_account, amount):
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/f_virementSafran.jsp?n=11")
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/virementsafran/aiguillage/saisieComptes.ea")
self.Browser.select_form(name="AiguillageForm")
self.Browser["idxCompteEmetteur"] = [from_account.id]
self.Browser["idxCompteReceveur"] = [to_account.id]
self.Browser.submit()
self.Browser.select_form(name="VirementNationalForm")
self.Browser["montant"] = str(amount)
self.Browser.submit()
#Confirmation
# TODO: verifier que tout c'est bien passe
self.Browser.open("https://voscomptesenligne.labanquepostale.fr/voscomptes/canalXHTML/virementsafran/virementnational/4-virementNational.ea")

View file

@ -76,7 +76,7 @@ class BaseApplication(object):
# Default storage tree
STORAGE = {}
# Synopsis
SYNOPSIS = 'Usage: %prog [-h] [-dqv] [-b backends] ...'
SYNOPSIS = 'Usage: %prog [-h] [-dqv] [-b backends] ...'
SYNOPSIS += ' %prog [--help] [--version]'
# Description
DESCRIPTION = None
@ -128,6 +128,10 @@ class BaseApplication(object):
self._parser = option_parser
if self.DESCRIPTION:
self._parser.description = self.DESCRIPTION
app_options = OptionGroup(option_parser, '%s Options' % self.APPNAME.capitalize())
self.add_application_options(app_options)
if len(app_options.option_list) > 0:
self._parser.add_option_group(app_options)
self._parser.add_option('-b', '--backends', help='what backend(s) to enable (comma separated)')
logging_options = OptionGroup(self._parser, 'Logging Options')
logging_options.add_option('-d', '--debug', action='store_true', help='display debug messages')

View file

@ -50,9 +50,6 @@ class ConsoleApplication(BaseApplication):
def __init__(self):
self.enabled_backends = set()
option_parser = OptionParser(self.SYNOPSIS, version=self._get_optparse_version())
app_options = OptionGroup(option_parser, '%s Options' % self.APPNAME.capitalize())
self.add_application_options(app_options)
option_parser.add_option_group(app_options)
try:
BaseApplication.__init__(self, option_parser=option_parser)
@ -85,10 +82,6 @@ class ConsoleApplication(BaseApplication):
formatting_options.add_option('--no-keys', dest='no_keys', action='store_true', help='do not display item keys')
self._parser.add_option_group(formatting_options)
def add_application_options(self, group):
# XXX why is it in ConsoleApplication and not BaseApplication? -romain
pass
def _handle_options(self):
if self.options.formatter:
formatter_name = self.options.formatter

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010 Christophe Benz
# Copyright(C) 2010 Christophe Benz, Romain Bignon
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -19,16 +19,12 @@
from __future__ import with_statement
import atexit
import cmd
from cStringIO import StringIO
from functools import partial
from cmd import Cmd
import getpass
from inspect import getargspec
import logging
from optparse import OptionGroup, OptionParser
import os
import re
import subprocess
import sys
from weboob.capabilities.base import FieldNotFound
@ -41,10 +37,13 @@ from .formatters.load import formatters as available_formatters, load_formatter
from .results import ResultsCondition, ResultsConditionException
__all__ = ['ReplApplication']
__all__ = ['ReplApplication', 'NotEnoughArguments']
class ReplApplication(cmd.Cmd, BaseApplication):
class NotEnoughArguments(Exception):
pass
class ReplApplication(Cmd, BaseApplication):
"""
Base application class for CLI applications.
"""
@ -53,7 +52,7 @@ class ReplApplication(cmd.Cmd, BaseApplication):
SYNOPSIS += ' %prog [--help] [--version]'
def __init__(self):
cmd.Cmd.__init__(self)
Cmd.__init__(self)
self.prompt = '%s> ' % self.APPNAME
self.intro = '\n'.join(('Welcome to %s v%s' % (self.APPNAME, self.VERSION),
'',
@ -69,9 +68,6 @@ class ReplApplication(cmd.Cmd, BaseApplication):
self.hidden_commands = set(['EOF'])
option_parser = OptionParser(self.SYNOPSIS, version=self._get_optparse_version())
app_options = OptionGroup(option_parser, '%s Options' % self.APPNAME.capitalize())
self.add_application_options(app_options)
option_parser.add_option_group(app_options)
try:
BaseApplication.__init__(self, option_parser=option_parser)
@ -85,7 +81,7 @@ class ReplApplication(cmd.Cmd, BaseApplication):
self._parser.description = ''
app_cmds, weboob_cmds, undoc_cmds = self.get_commands_doc()
help_str = '%s Commands:\n%s\n\n' % (self.APPNAME, '\n'.join(' %s' % cmd for cmd in sorted(app_cmds + undoc_cmds)))
help_str = '%s Commands:\n%s\n\n' % (self.APPNAME.capitalize(), '\n'.join(' %s' % cmd for cmd in sorted(app_cmds + undoc_cmds)))
help_str +='Weboob Commands:\n%s\n' % '\n'.join(' %s' % cmd for cmd in weboob_cmds)
self._parser.description += help_str
@ -140,6 +136,29 @@ class ReplApplication(cmd.Cmd, BaseApplication):
except BackendNotFound, e:
logging.error(e)
def parseargs(self, line, nb, req_n=None):
args = line.strip().split(' ', nb - 1)
if req_n is not None and len(args) < req_n:
raise NotEnoughArguments('Command needs %d arguments' % req_n)
if len(args) < nb:
args += tuple([None for i in xrange(nb - len(args))])
return args
def onecmd(self, _cmd):
"""
This REPL method is overrided to catch some particular exceptions.
"""
try:
return super(ReplApplication, self).onecmd(_cmd)
except CallErrors, e:
if len(e.errors) == 1 and isinstance(e.errors[0][1], FieldNotFound):
print >>sys.stderr, e.errors[0][1]
else:
raise
except NotEnoughArguments, e:
print >>sys.stderr, 'Error: no enough arguments.'
def main(self, argv):
cmd_args = argv[1:]
if cmd_args:
@ -164,15 +183,11 @@ class ReplApplication(cmd.Cmd, BaseApplication):
if fields == '$direct':
fields = None
elif fields == '$full':
fields = [k for k, v in iter_fields(obj)]
try:
for values in self.weboob.do(self._complete, self.options.count, fields, function, *args, **kwargs):
yield values
except CallErrors, e:
if len(e.errors) == 1 and isinstance(e.errors[0][1], FieldNotFound):
logging.error(e.errors[0][1])
else:
raise
# XXX IT ABSOLUTLY DOESN'T WORK, OBJ ISN'T EXISTANT.
# PLEASE REVIEW THIS CODE.
#fields = [k for k, v in iter_fields(obj)]
fields = None
return self.weboob.do(self._complete, self.options.count, fields, function, *args, **kwargs)
# options related methods
@ -284,7 +299,7 @@ class ReplApplication(cmd.Cmd, BaseApplication):
print 'Unknown command: "%s"' % line
def completenames(self, text, *ignored):
return ['%s ' % name for name in cmd.Cmd.completenames(self, text, *ignored) if name not in self.hidden_commands]
return ['%s ' % name for name in Cmd.completenames(self, text, *ignored) if name not in self.hidden_commands]
def completion_helper(self, text, choices):
if text: