split ReplApplication to create ConsoleApplication

This commit is contained in:
Romain Bignon 2011-03-05 21:55:20 +01:00
commit 2ba3f46f3b
3 changed files with 622 additions and 569 deletions

View file

@ -25,6 +25,7 @@ import tempfile
from weboob.capabilities.base import NotAvailable, NotLoaded
from weboob.core import Weboob, CallErrors
from weboob.core.backendscfg import BackendsConfig
from weboob.tools.config.iconfig import ConfigError
from weboob.tools.backend import ObjectNotAvailable
from weboob.tools.log import createColoredFormatter, getLogger
@ -352,7 +353,12 @@ class BaseApplication(object):
if args is None:
args = [(sys.stdin.encoding and arg.decode(sys.stdin.encoding) or arg) for arg in sys.argv]
app = klass()
try:
app = klass()
except BackendsConfig.WrongPermissions, e:
print e
sys.exit(1)
try:
try:

View file

@ -0,0 +1,430 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz, Romain Bignon
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from copy import deepcopy
import getpass
import logging
import sys
from weboob.capabilities.account import ICapAccount, Account, AccountRegisterError
from weboob.core.backendscfg import BackendAlreadyExists
from weboob.core.modules import ModuleLoadError
from weboob.tools.browser import BrowserUnavailable, BrowserIncorrectPassword
from weboob.tools.value import Value, ValueBool, ValueFloat, ValueInt
from .base import BackendNotFound, BaseApplication
class BackendNotGiven(Exception):
pass
class ConsoleApplication(BaseApplication):
"""
Base application class for CLI applications.
"""
CAPS = None
# shell escape strings
BOLD = ''
NC = '' # no color
stdin = sys.stdin
stdout = sys.stdout
def __init__(self, option_parser=None):
BaseApplication.__init__(self, option_parser)
self.enabled_backends = set()
def unload_backends(self, *args, **kwargs):
unloaded = self.weboob.unload_backends(*args, **kwargs)
for backend in unloaded.itervalues():
try:
self.enabled_backends.remove(backend)
except KeyError:
pass
return unloaded
def is_backend_loadable(self, backend):
return self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__)
def load_backends(self, *args, **kwargs):
if 'errors' in kwargs:
errors = kwargs['errors']
else:
kwargs['errors'] = errors = []
ret = super(ConsoleApplication, self).load_backends(*args, **kwargs)
for err in errors:
print >>sys.stderr, 'Error(%s): %s' % (err.backend_name, err)
if self.ask('Do you want to reconfigure this backend?', default=True):
self.edit_backend(err.backend_name)
self.load_backends(names=[err.backend_name])
for name, backend in ret.iteritems():
self.enabled_backends.add(backend)
while len(self.enabled_backends) == 0:
print 'Warning: there is currently no configured backend for %s' % self.APPNAME
if not self.ask('Do you want to configure backends?', default=True):
break
self.weboob.modules_loader.load_all()
r = ''
while r != 'q':
backends = []
print '\nAvailable backends:'
for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()):
if not self.is_backend_loadable(backend):
continue
backends.append(name)
loaded = ' '
for bi in self.weboob.iter_backends():
if bi.NAME == name:
if loaded == ' ':
loaded = 'X'
elif loaded == 'X':
loaded = 2
else:
loaded += 1
print '%s%d)%s [%s] %s%-15s%s %s' % (self.BOLD, len(backends), self.NC, loaded,
self.BOLD, name, self.NC, backend.description)
print '%sq)%s --stop--\n' % (self.BOLD, self.NC)
r = self.ask('Select a backend to add (q to stop)', regexp='^(\d+|q)$')
if r.isdigit():
i = int(r) - 1
if i < 0 or i >= len(backends):
print 'Error: %s is not a valid choice' % r
continue
name = backends[i]
try:
inst = self.add_backend(name)
if inst:
self.load_backends(names=[inst])
except (KeyboardInterrupt, EOFError):
print '\nAborted.'
print 'Right right!'
return ret
def _handle_options(self):
self.load_default_backends()
def load_default_backends(self):
"""
By default loads all backends.
Applications can overload this method to restrict backends loaded.
"""
self.load_backends(self.CAPS)
@classmethod
def run(klass, args=None):
try:
super(ConsoleApplication, klass).run(args)
except BackendNotFound, e:
logging.error(e)
def do(self, function, *args, **kwargs):
if not 'backends' in kwargs:
kwargs['backends'] = self.enabled_backends
return self.weboob.do(function, *args, **kwargs)
def parse_id(self, _id, unique_backend=False):
try:
_id, backend_name = _id.rsplit('@', 1)
except ValueError:
backend_name = None
if unique_backend and not backend_name:
backends = []
for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()):
if self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__):
continue
backends.append((name, backend))
if self.interactive:
while not backend_name:
print 'This command works with a unique backend. Availables:'
for index, (name, backend) in enumerate(backends):
print '%s%d)%s %s%-15s%s %s' % (self.BOLD, index + 1, self.NC, self.BOLD, name, self.NC,
backend.description)
response = self.ask('Select a backend to proceed', regexp='^\d+$')
if response.isdigit():
i = int(response) - 1
if i < 0 or i >= len(backends):
print 'Error: %s is not a valid choice' % response
continue
backend_name = backends[i][0]
else:
raise BackendNotGiven('Please specify a backend to use for this argument (%s@backend_name). '
'Availables: %s.' % (_id, ', '.join(name for name, backend in backends)))
return _id, backend_name
def caps_included(self, modcaps, caps):
modcaps = [x.__name__ for x in modcaps]
if not isinstance(caps, (list,set,tuple)):
caps = (caps,)
for cap in caps:
if not cap in modcaps:
return False
return True
# user interaction related methods
def register_backend(self, name, ask_add=True):
try:
backend = self.weboob.modules_loader.get_or_load_module(name)
except ModuleLoadError, e:
backend = None
if not backend:
print 'Backend "%s" does not exist.' % name
return None
if not backend.has_caps(ICapAccount) or backend.klass.ACCOUNT_REGISTER_PROPERTIES is None:
print 'You can\'t register a new account with %s' % name
return None
account = Account()
account.properties = {}
if backend.website:
website = 'on website %s' % backend.website
else:
website = 'with backend %s' % backend.name
while 1:
asked_config = False
for key, prop in backend.klass.ACCOUNT_REGISTER_PROPERTIES.iteritems():
if not asked_config:
asked_config = True
print 'Configuration of new account %s' % website
print '-----------------------------%s' % ('-' * len(website))
p = deepcopy(prop)
p.set_value(self.ask(prop, default=account.properties[key].value if (key in account.properties) else prop.default))
account.properties[key] = p
if asked_config:
print '-----------------------------%s' % ('-' * len(website))
try:
backend.klass.register_account(account)
except AccountRegisterError, e:
print u'%s' % e
if self.ask('Do you want to try again?', default=True):
continue
else:
return None
else:
break
backend_config = {}
for key, value in account.properties.iteritems():
if key in backend.config:
backend_config[key] = value.value
if ask_add and self.ask('Do you want to add the new register account?', default=True):
return self.add_backend(name, backend_config, ask_register=False)
return backend_config
def edit_backend(self, name, params=None):
return self.add_backend(name, params, True)
def add_backend(self, name, params=None, edit=False, ask_register=True):
if params is None:
params = {}
if not edit:
try:
backend = self.weboob.modules_loader.get_or_load_module(name)
except ModuleLoadError:
backend = None
else:
bname, items = self.weboob.backends_config.get_backend(name)
try:
backend = self.weboob.modules_loader.get_or_load_module(bname)
except ModuleLoadError:
backend = None
items.update(params)
params = items
if not backend:
print 'Backend "%s" does not exist. Hint: use the "backends" command.' % name
return None
# ask for params non-specified on command-line arguments
asked_config = False
for key, value in backend.config.iteritems():
if not asked_config:
asked_config = True
print 'Configuration of backend'
print '------------------------'
if key not in params or edit:
params[key] = self.ask(value, default=params[key] if (key in params) else value.default)
else:
print u' [%s] %s: %s' % (key, value.description, '(masked)' if value.masked else params[key])
if asked_config:
print '------------------------'
try:
self.weboob.backends_config.add_backend(name, name, params, edit=edit)
print 'Backend "%s" successfully %s.' % (name, 'updated' if edit else 'added')
return name
except BackendAlreadyExists:
print 'Backend "%s" is already configured in file "%s"' % (name, self.weboob.backends_config.confpath)
while self.ask('Add new instance of "%s" backend?' % name, default=False):
new_name = self.ask('Please give new instance name (could be "%s_1")' % name, regexp=r'^[\w\-_]+$')
try:
self.weboob.backends_config.add_backend(new_name, name, params)
print 'Backend "%s" successfully added.' % new_name
return new_name
except BackendAlreadyExists:
print 'Instance "%s" already exists for backend "%s".' % (new_name, name)
def ask(self, question, default=None, masked=False, regexp=None, choices=None):
"""
Ask a question to user.
@param question text displayed (str)
@param default optional default value (str)
@param masked if True, do not show typed text (bool)
@param regexp text must match this regexp (str)
@param choices choices to do (list)
@return entered text by user (str)
"""
if isinstance(question, Value):
v = deepcopy(question)
if default:
v.default = default
if masked:
v.masked = masked
if regexp:
v.regexp = regexp
if choices:
v.choices = choices
else:
if isinstance(default, bool):
klass = ValueBool
elif isinstance(default, float):
klass = ValueFloat
elif isinstance(default, (int,long)):
klass = ValueInt
else:
klass = Value
v = klass(label=question, default=default, masked=masked, regexp=regexp, choices=choices)
question = v.label
if v.id:
question = u'[%s] %s' % (v.id, question)
aliases = {}
if isinstance(v, ValueBool):
question = u'%s (%s/%s)' % (question, 'Y' if v.default else 'y', 'n' if v.default else 'N')
elif v.choices:
tiny = True
for key in v.choices.iterkeys():
if len(key) > 5 or ' ' in key:
tiny = False
break
if tiny:
question = u'%s (%s)' % (question, '/'.join((s.upper() if s == v.default else s)
for s in (v.choices.iterkeys())))
else:
for n, (key, value) in enumerate(v.choices.iteritems()):
print '%s%2d)%s %s' % (self.BOLD, n + 1, self.NC, value)
aliases[str(n + 1)] = key
question = u'%s (choose in list)' % question
elif default not in (None, '') and not v.masked:
question = u'%s [%s]' % (question, v.default)
if v.masked:
question = u'%s (hidden input)' % question
question += ': '
while True:
if v.masked:
line = getpass.getpass(question)
else:
self.stdout.write(question)
self.stdout.flush()
line = self.stdin.readline()
if len(line) == 0:
raise EOFError()
else:
line = line.rstrip('\r\n')
if not line and v.default is not None:
line = v.default
if isinstance(line, str):
line = line.decode('utf-8')
if line in aliases:
line = aliases[line]
try:
v.set_value(line)
except ValueError, e:
print 'Error: %s' % e
else:
break
return v.value
def bcall_error_handler(self, backend, error, backtrace):
"""
Handler for an exception inside the CallErrors exception.
This method can be overrided to support more exceptions types.
"""
if isinstance(error, BrowserIncorrectPassword):
msg = unicode(error)
if not msg:
msg = 'invalid login/password.'
print >>sys.stderr, 'Error(%s): %s' % (backend.name, msg)
if self.ask('Do you want to reconfigure this backend?', default=True):
self.unload_backends(names=[backend.name])
self.edit_backend(backend.name)
self.load_backends(names=[backend.name])
elif isinstance(error, BrowserUnavailable):
msg = unicode(error)
if not msg:
msg = 'website is unavailable.'
print >>sys.stderr, u'Error(%s): %s' % (backend.name, msg)
elif isinstance(error, NotImplementedError):
print >>sys.stderr, u'Error(%s): this feature is not supported yet by this backend.' % backend.name
print >>sys.stderr, u' %s To help the maintainer of this backend implement this feature,' % (' ' * len(backend.name))
print >>sys.stderr, u' %s please contact: %s <%s>' % (' ' * len(backend.name), backend.MAINTAINER, backend.EMAIL)
else:
print >>sys.stderr, u'Error(%s): %s' % (backend.name, error)
if logging.root.level == logging.DEBUG:
print >>sys.stderr, backtrace
else:
return True
def bcall_errors_handler(self, errors):
"""
Handler for the CallErrors exception.
"""
ask_debug_mode = False
for backend, error, backtrace in errors.errors:
if self.bcall_error_handler(backend, error, backtrace):
ask_debug_mode = True
if ask_debug_mode:
if self.interactive:
print >>sys.stderr, 'Use "logging debug" option to print backtraces.'
else:
print >>sys.stderr, 'Use --debug option to print backtraces.'

View file

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010 Christophe Benz, Romain Bignon
# Copyright(C) 2010-2011 Christophe Benz, Romain Bignon
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -18,25 +18,19 @@
import atexit
from cmd import Cmd
import getpass
import logging
from optparse import OptionGroup, OptionParser, IndentedHelpFormatter
import os
import sys
from copy import deepcopy
from weboob.capabilities.account import ICapAccount, Account, AccountRegisterError
from weboob.capabilities.base import FieldNotFound
from weboob.core import CallErrors
from weboob.core.modules import ModuleLoadError
from weboob.core.backendscfg import BackendsConfig, BackendAlreadyExists
from weboob.tools.application.formatters.iformatter import MandatoryFieldsNotFound
from weboob.tools.browser import BrowserUnavailable, BrowserIncorrectPassword
from weboob.tools.value import Value, ValueBool, ValueFloat, ValueInt
from weboob.tools.misc import to_unicode
from weboob.tools.ordereddict import OrderedDict
from .base import BackendNotFound, BaseApplication
from .console import BackendNotGiven, ConsoleApplication
from .formatters.load import FormattersLoader, FormatterLoadError
from .results import ResultsCondition, ResultsConditionError
@ -44,17 +38,9 @@ from .results import ResultsCondition, ResultsConditionError
__all__ = ['ReplApplication', 'NotEnoughArguments']
class BackendNotGiven(Exception):
pass
class NotEnoughArguments(Exception):
pass
class OutputIsNone(Exception):
pass
class ReplOptionParser(OptionParser):
def format_option_help(self, formatter=None):
if not formatter:
@ -75,20 +61,15 @@ class ReplOptionFormatter(IndentedHelpFormatter):
s += ' %s\n' % c
return s
class ReplApplication(Cmd, BaseApplication):
class ReplApplication(Cmd, ConsoleApplication):
"""
Base application class for CLI applications.
Base application class for Repl applications.
"""
SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] [command [arguments..]]\n'
SYNOPSIS += ' %prog [--help] [--version]'
CAPS = None
DISABLE_REPL = False
# shell escape strings
BOLD = ''
NC = '' # no color
EXTRA_FORMATTERS = {}
DEFAULT_FORMATTER = 'multiline'
COMMANDS_FORMATTERS = {}
@ -121,11 +102,7 @@ class ReplApplication(Cmd, BaseApplication):
self.formatter = None
self.commands_formatters = self.COMMANDS_FORMATTERS.copy()
try:
BaseApplication.__init__(self, ReplOptionParser(self.SYNOPSIS, version=self._get_optparse_version()))
except BackendsConfig.WrongPermissions, e:
print e
sys.exit(1)
ConsoleApplication.__init__(self, ReplOptionParser(self.SYNOPSIS, version=self._get_optparse_version()))
commands_help = self.get_commands_doc()
self._parser.commands = commands_help
@ -148,306 +125,11 @@ class ReplApplication(Cmd, BaseApplication):
self._parser.add_option_group(formatting_options)
self._interactive = False
self.enabled_backends = set()
@property
def interactive(self):
return self._interactive
def caps_included(self, modcaps, caps):
modcaps = [x.__name__ for x in modcaps]
if not isinstance(caps, (list,set,tuple)):
caps = (caps,)
for cap in caps:
if not cap in modcaps:
return False
return True
def register_backend(self, name, ask_add=True):
try:
backend = self.weboob.modules_loader.get_or_load_module(name)
except ModuleLoadError, e:
backend = None
if not backend:
print 'Backend "%s" does not exist.' % name
return None
if not backend.has_caps(ICapAccount) or backend.klass.ACCOUNT_REGISTER_PROPERTIES is None:
print 'You can\'t register a new account with %s' % name
return None
account = Account()
account.properties = {}
if backend.website:
website = 'on website %s' % backend.website
else:
website = 'with backend %s' % backend.name
while 1:
asked_config = False
for key, prop in backend.klass.ACCOUNT_REGISTER_PROPERTIES.iteritems():
if not asked_config:
asked_config = True
print 'Configuration of new account %s' % website
print '-----------------------------%s' % ('-' * len(website))
p = deepcopy(prop)
p.set_value(self.ask(prop, default=account.properties[key].value if (key in account.properties) else prop.default))
account.properties[key] = p
if asked_config:
print '-----------------------------%s' % ('-' * len(website))
try:
backend.klass.register_account(account)
except AccountRegisterError, e:
print u'%s' % e
if self.ask('Do you want to try again?', default=True):
continue
else:
return None
else:
break
backend_config = {}
for key, value in account.properties.iteritems():
if key in backend.config:
backend_config[key] = value.value
if ask_add and self.ask('Do you want to add the new register account?', default=True):
return self.add_backend(name, backend_config, ask_register=False)
return backend_config
def edit_backend(self, name, params=None):
return self.add_backend(name, params, True)
def add_backend(self, name, params=None, edit=False, ask_register=True):
if params is None:
params = {}
if not edit:
try:
backend = self.weboob.modules_loader.get_or_load_module(name)
except ModuleLoadError:
backend = None
else:
bname, items = self.weboob.backends_config.get_backend(name)
try:
backend = self.weboob.modules_loader.get_or_load_module(bname)
except ModuleLoadError:
backend = None
items.update(params)
params = items
if not backend:
print 'Backend "%s" does not exist. Hint: use the "backends" command.' % name
return None
# ask for params non-specified on command-line arguments
asked_config = False
for key, value in backend.config.iteritems():
if not asked_config:
asked_config = True
print 'Configuration of backend'
print '------------------------'
if key not in params or edit:
params[key] = self.ask(value, default=params[key] if (key in params) else value.default)
else:
print u' [%s] %s: %s' % (key, value.description, '(masked)' if value.masked else params[key])
if asked_config:
print '------------------------'
try:
self.weboob.backends_config.add_backend(name, name, params, edit=edit)
print 'Backend "%s" successfully %s.' % (name, 'updated' if edit else 'added')
return name
except BackendAlreadyExists:
print 'Backend "%s" is already configured in file "%s"' % (name, self.weboob.backends_config.confpath)
while self.ask('Add new instance of "%s" backend?' % name, default=False):
new_name = self.ask('Please give new instance name (could be "%s_1")' % name, regexp=r'^[\w\-_]+$')
try:
self.weboob.backends_config.add_backend(new_name, name, params)
print 'Backend "%s" successfully added.' % new_name
return new_name
except BackendAlreadyExists:
print 'Instance "%s" already exists for backend "%s".' % (new_name, name)
def unload_backends(self, *args, **kwargs):
unloaded = self.weboob.unload_backends(*args, **kwargs)
for backend in unloaded.itervalues():
try:
self.enabled_backends.remove(backend)
except KeyError:
pass
return unloaded
def load_backends(self, *args, **kwargs):
if 'errors' in kwargs:
errors = kwargs['errors']
else:
kwargs['errors'] = errors = []
ret = super(ReplApplication, self).load_backends(*args, **kwargs)
for err in errors:
print >>sys.stderr, 'Error(%s): %s' % (err.backend_name, err)
if self.ask('Do you want to reconfigure this backend?', default=True):
self.edit_backend(err.backend_name)
self.load_backends(names=[err.backend_name])
for name, backend in ret.iteritems():
self.enabled_backends.add(backend)
while len(self.enabled_backends) == 0:
print 'Warning: there is currently no configured backend for %s' % self.APPNAME
if not self.ask('Do you want to configure backends?', default=True):
break
self.weboob.modules_loader.load_all()
r = ''
while r != 'q':
backends = []
print '\nAvailable backends:'
for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()):
if self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__):
continue
backends.append(name)
loaded = ' '
for bi in self.weboob.iter_backends():
if bi.NAME == name:
if loaded == ' ':
loaded = 'X'
elif loaded == 'X':
loaded = 2
else:
loaded += 1
print '%s%d)%s [%s] %s%-15s%s %s' % (self.BOLD, len(backends), self.NC, loaded,
self.BOLD, name, self.NC, backend.description)
print '%sq)%s --stop--\n' % (self.BOLD, self.NC)
r = self.ask('Select a backend to add (q to stop)', regexp='^(\d+|q)$')
if r.isdigit():
i = int(r) - 1
if i < 0 or i >= len(backends):
print 'Error: %s is not a valid choice' % r
continue
name = backends[i]
try:
inst = self.add_backend(name)
if inst:
self.load_backends(names=[inst])
except (KeyboardInterrupt, EOFError):
print '\nAborted.'
print 'Right right!'
return ret
def load_default_backends(self):
"""
By default loads all backends.
Applications can overload this method to restrict backends loaded.
"""
self.load_backends(self.CAPS)
@classmethod
def run(klass, args=None):
try:
super(ReplApplication, klass).run(args)
except BackendNotFound, e:
logging.error(e)
def parse_command_args(self, line, nb, req_n=None):
if line.strip() == '':
# because ''.split() = ['']
args = []
else:
args = line.strip().split(' ', nb - 1)
if req_n is not None and (len(args) < req_n):
raise NotEnoughArguments('Command needs %d arguments' % req_n)
if len(args) < nb:
args += tuple(None for i in xrange(nb - len(args)))
return args
def postcmd(self, stop, line):
"""
This REPL method is overrided to return None instead of integers
to prevent stopping cmdloop().
"""
if not isinstance(stop, bool):
stop = None
return stop
def bcall_error_handler(self, backend, error, backtrace):
"""
Handler for an exception inside the CallErrors exception.
This method can be overrided to support more exceptions types.
"""
if isinstance(error, BrowserIncorrectPassword):
msg = unicode(error)
if not msg:
msg = 'invalid login/password.'
print >>sys.stderr, 'Error(%s): %s' % (backend.name, msg)
if self.ask('Do you want to reconfigure this backend?', default=True):
self.unload_backends(names=[backend.name])
self.edit_backend(backend.name)
self.load_backends(names=[backend.name])
elif isinstance(error, BrowserUnavailable):
msg = unicode(error)
if not msg:
msg = 'website is unavailable.'
print >>sys.stderr, u'Error(%s): %s' % (backend.name, msg)
elif isinstance(error, ResultsConditionError):
print >>sys.stderr, u'Error(%s): condition error: %s' % (backend.name, error)
elif isinstance(error, NotImplementedError):
print >>sys.stderr, u'Error(%s): this feature is not supported yet by this backend.' % backend.name
print >>sys.stderr, u' %s To help the maintainer of this backend implement this feature,' % (' ' * len(backend.name))
print >>sys.stderr, u' %s please contact: %s <%s>' % (' ' * len(backend.name), backend.MAINTAINER, backend.EMAIL)
else:
print >>sys.stderr, u'Error(%s): %s' % (backend.name, error)
if logging.root.level == logging.DEBUG:
print >>sys.stderr, backtrace
else:
return True
def bcall_errors_handler(self, errors):
"""
Handler for the CallErrors exception.
"""
ask_debug_mode = False
for backend, error, backtrace in errors.errors:
if self.bcall_error_handler(backend, error, backtrace):
ask_debug_mode = True
if ask_debug_mode:
if self.interactive:
print >>sys.stderr, 'Use "logging debug" option to print backtraces.'
else:
print >>sys.stderr, 'Use --debug option to print backtraces.'
def onecmd(self, line):
"""
This REPL method is overrided to catch some particular exceptions.
"""
line = to_unicode(line)
cmd, arg, ignored = self.parseline(line)
# Set the right formatter for the command.
try:
formatter_name = self.commands_formatters[cmd]
except KeyError:
formatter_name = self.DEFAULT_FORMATTER
self.set_formatter(formatter_name)
try:
return super(ReplApplication, self).onecmd(line)
except CallErrors, e:
self.bcall_errors_handler(e)
except BackendNotGiven, e:
print >>sys.stderr, 'Error: %s' % str(e)
except NotEnoughArguments, e:
print >>sys.stderr, 'Error: not enough arguments. %s' % str(e)
except (KeyboardInterrupt, EOFError):
# ^C during a command process doesn't exit application.
print '\nAborted.'
def main(self, argv):
cmd_args = argv[1:]
if cmd_args:
@ -496,98 +178,55 @@ class ReplApplication(Cmd, BaseApplication):
fields = None
return self.weboob.do(self._do_complete, self.options.count, fields, function, *args, **kwargs)
# options related methods
def _handle_options(self):
if self.options.formatter:
self.commands_formatters = {}
self.DEFAULT_FORMATTER = self.options.formatter
self.set_formatter(self.DEFAULT_FORMATTER)
if self.options.select:
self.selected_fields = self.options.select.split(',')
# -- command tools ------------
def parse_command_args(self, line, nb, req_n=None):
if line.strip() == '':
# because ''.split() = ['']
args = []
else:
self.selected_fields = '$direct'
args = line.strip().split(' ', nb - 1)
if req_n is not None and (len(args) < req_n):
raise NotEnoughArguments('Command needs %d arguments' % req_n)
if self.options.condition:
self.condition = ResultsCondition(self.options.condition)
else:
self.condition = None
if len(args) < nb:
args += tuple(None for i in xrange(nb - len(args)))
return args
if self.options.count == 0:
self._parser.error('Count must be at least 1, or negative for infinite')
elif self.options.count < 0:
# infinite search
self.options.count = None
self.load_default_backends()
# default REPL commands
def do_quit(self, arg):
# -- cmd.Cmd methods ---------
def postcmd(self, stop, line):
"""
Quit the application.
This REPL method is overrided to return None instead of integers
to prevent stopping cmdloop().
"""
return True
if not isinstance(stop, bool):
stop = None
return stop
def do_EOF(self, arg):
def onecmd(self, line):
"""
Quit the command line interpreter when ^D is pressed.
This REPL method is overrided to catch some particular exceptions.
"""
# print empty line for the next shell prompt to appear on the first column of the terminal
print
return self.do_quit(arg)
line = to_unicode(line)
cmd, arg, ignored = self.parseline(line)
def get_command_help(self, command, short=False):
# Set the right formatter for the command.
try:
doc = getattr(self, 'do_' + command).__doc__
except AttributeError:
return None
if doc:
doc = '\n'.join(line.strip() for line in doc.strip().split('\n'))
if not doc.startswith(command):
doc = '%s\n\n%s' % (command, doc)
if short:
doc = doc.split('\n')[0]
if not doc.startswith(command):
doc = command
formatter_name = self.commands_formatters[cmd]
except KeyError:
formatter_name = self.DEFAULT_FORMATTER
self.set_formatter(formatter_name)
return doc
def get_commands_doc(self):
names = set(name for name in self.get_names() if name.startswith('do_'))
appname = self.APPNAME.capitalize()
d = OrderedDict(((appname, []), ('Weboob', [])))
for name in sorted(names):
cmd = name[3:]
if cmd in self.hidden_commands.union(self.weboob_commands).union(['help']):
continue
elif getattr(self, name).__doc__:
d[appname].append(self.get_command_help(cmd))
else:
d[appname].append(cmd)
if not self.DISABLE_REPL:
for cmd in self.weboob_commands:
d['Weboob'].append(self.get_command_help(cmd))
return d
def do_help(self, arg=None):
if arg:
cmd_names = set(name[3:] for name in self.get_names() if name.startswith('do_'))
if arg in cmd_names:
command_help = self.get_command_help(arg)
if command_help is None:
logging.warning(u'Command "%s" is undocumented' % arg)
else:
self.stdout.write('%s\n' % command_help)
else:
print 'Unknown command: "%s"' % arg
else:
cmds = self._parser.formatter.format_commands(self._parser.commands)
self.stdout.write('%s\n' % cmds)
self.stdout.write('Type "help <command>" for more info about a command.\n')
try:
return super(ReplApplication, self).onecmd(line)
except CallErrors, e:
self.bcall_errors_handler(e)
except BackendNotGiven, e:
print >>sys.stderr, 'Error: %s' % str(e)
except NotEnoughArguments, e:
print >>sys.stderr, 'Error: not enough arguments. %s' % str(e)
except (KeyboardInterrupt, EOFError):
# ^C during a command process doesn't exit application.
print '\nAborted.'
def emptyline(self):
"""
@ -641,6 +280,138 @@ class ReplApplication(Cmd, BaseApplication):
return '%s ' % match
return match
# -- errors management -------------
def bcall_error_handler(self, backend, error, backtrace):
"""
Handler for an exception inside the CallErrors exception.
This method can be overrided to support more exceptions types.
"""
if isinstance(error, ResultsConditionError):
print >>sys.stderr, u'Error(%s): condition error: %s' % (backend.name, error)
else:
return super(ReplApplication, self).bcall_error_handler(self, backend, error, backtrace)
# -- options related methods -------------
def _handle_options(self):
if self.options.formatter:
self.commands_formatters = {}
self.DEFAULT_FORMATTER = self.options.formatter
self.set_formatter(self.DEFAULT_FORMATTER)
if self.options.select:
self.selected_fields = self.options.select.split(',')
else:
self.selected_fields = '$direct'
if self.options.condition:
self.condition = ResultsCondition(self.options.condition)
else:
self.condition = None
if self.options.count == 0:
self._parser.error('Count must be at least 1, or negative for infinite')
elif self.options.count < 0:
# infinite search
self.options.count = None
return super(ReplApplication, self)._handle_options()
def get_command_help(self, command, short=False):
try:
doc = getattr(self, 'do_' + command).__doc__
except AttributeError:
return None
if doc:
doc = '\n'.join(line.strip() for line in doc.strip().split('\n'))
if not doc.startswith(command):
doc = '%s\n\n%s' % (command, doc)
if short:
doc = doc.split('\n')[0]
if not doc.startswith(command):
doc = command
return doc
def get_commands_doc(self):
names = set(name for name in self.get_names() if name.startswith('do_'))
appname = self.APPNAME.capitalize()
d = OrderedDict(((appname, []), ('Weboob', [])))
for name in sorted(names):
cmd = name[3:]
if cmd in self.hidden_commands.union(self.weboob_commands).union(['help']):
continue
elif getattr(self, name).__doc__:
d[appname].append(self.get_command_help(cmd))
else:
d[appname].append(cmd)
if not self.DISABLE_REPL:
for cmd in self.weboob_commands:
d['Weboob'].append(self.get_command_help(cmd))
return d
# -- default REPL commands ---------
def do_quit(self, arg):
"""
Quit the application.
"""
return True
def do_EOF(self, arg):
"""
Quit the command line interpreter when ^D is pressed.
"""
# print empty line for the next shell prompt to appear on the first column of the terminal
print
return self.do_quit(arg)
def do_help(self, arg=None):
if arg:
cmd_names = set(name[3:] for name in self.get_names() if name.startswith('do_'))
if arg in cmd_names:
command_help = self.get_command_help(arg)
if command_help is None:
logging.warning(u'Command "%s" is undocumented' % arg)
else:
self.stdout.write('%s\n' % command_help)
else:
print 'Unknown command: "%s"' % arg
else:
cmds = self._parser.formatter.format_commands(self._parser.commands)
self.stdout.write('%s\n' % cmds)
self.stdout.write('Type "help <command>" for more info about a command.\n')
def complete_backends(self, text, line, begidx, endidx):
choices = []
commands = ['enable', 'disable', 'only', 'list', 'add', 'register', 'edit', 'remove']
available_backends_names = set(backend.name for backend in self.weboob.iter_backends())
enabled_backends_names = set(backend.name for backend in self.enabled_backends)
args = line.split(' ')
if len(args) == 2:
choices = commands
elif len(args) >= 3:
if args[1] == 'enable':
choices = sorted(available_backends_names - enabled_backends_names)
elif args[1] == 'only':
choices = sorted(available_backends_names)
elif args[1] == 'disable':
choices = sorted(enabled_backends_names)
elif args[1] in ('add', 'register') and len(args) == 3:
self.weboob.modules_loader.load_all()
for name, module in sorted(self.weboob.modules_loader.loaded.iteritems()):
if not self.CAPS or self.caps_included(module.iter_caps(), self.CAPS.__name__):
choices.append(name)
elif args[1] == 'edit':
choices = sorted(available_backends_names)
elif args[1] == 'remove':
choices = sorted(available_backends_names)
return choices
def do_backends(self, line):
"""
backends [ACTION] [BACKEND_NAME]...
@ -737,33 +508,12 @@ class ReplApplication(Cmd, BaseApplication):
if len(self.enabled_backends) == 0:
print 'Warning: no more backends are loaded. %s is probably unusable.' % self.APPNAME.capitalize()
def complete_backends(self, text, line, begidx, endidx):
choices = []
commands = ['enable', 'disable', 'only', 'list', 'add', 'register', 'edit', 'remove']
available_backends_names = set(backend.name for backend in self.weboob.iter_backends())
enabled_backends_names = set(backend.name for backend in self.enabled_backends)
def complete_logging(self, text, line, begidx, endidx):
levels = ('debug', 'info', 'warning', 'error', 'quiet', 'default')
args = line.split(' ')
if len(args) == 2:
choices = commands
elif len(args) >= 3:
if args[1] == 'enable':
choices = sorted(available_backends_names - enabled_backends_names)
elif args[1] == 'only':
choices = sorted(available_backends_names)
elif args[1] == 'disable':
choices = sorted(enabled_backends_names)
elif args[1] in ('add', 'register') and len(args) == 3:
self.weboob.modules_loader.load_all()
for name, module in sorted(self.weboob.modules_loader.loaded.iteritems()):
if not self.CAPS or self.caps_included(module.iter_caps(), self.CAPS.__name__):
choices.append(name)
elif args[1] == 'edit':
choices = sorted(available_backends_names)
elif args[1] == 'remove':
choices = sorted(available_backends_names)
return choices
return levels
return ()
def do_logging(self, line):
"""
@ -804,13 +554,6 @@ class ReplApplication(Cmd, BaseApplication):
for handler in logging.root.handlers:
handler.setLevel(level)
def complete_logging(self, text, line, begidx, endidx):
levels = ('debug', 'info', 'warning', 'error', 'quiet', 'default')
args = line.split(' ')
if len(args) == 2:
return levels
return ()
def do_condition(self, line):
"""
condition [EXPRESSION | off]
@ -958,104 +701,44 @@ class ReplApplication(Cmd, BaseApplication):
else:
print ' '.join(self.selected_fields)
def complete_inspect(self, text, line, begidx, endidx):
return sorted(set(backend.name for backend in self.enabled_backends))
# user interaction related methods
def ask(self, question, default=None, masked=False, regexp=None, choices=None):
def do_inspect(self, line):
"""
Ask a question to user.
inspect BACKEND_NAME
@param question text displayed (str)
@param default optional default value (str)
@param masked if True, do not show typed text (bool)
@param regexp text must match this regexp (str)
@param choices choices to do (list)
@return entered text by user (str)
Display the HTML string of the current page of the specified backend's browser.
If webkit_mechanize_browser Python module is installed, HTML is displayed in a WebKit GUI.
"""
if isinstance(question, Value):
v = deepcopy(question)
if default:
v.default = default
if masked:
v.masked = masked
if regexp:
v.regexp = regexp
if choices:
v.choices = choices
if len(self.enabled_backends) == 1:
backend = list(self.enabled_backends)[0]
else:
if isinstance(default, bool):
klass = ValueBool
elif isinstance(default, float):
klass = ValueFloat
elif isinstance(default, (int,long)):
klass = ValueInt
else:
klass = Value
v = klass(label=question, default=default, masked=masked, regexp=regexp, choices=choices)
question = v.label
if v.id:
question = u'[%s] %s' % (v.id, question)
aliases = {}
if isinstance(v, ValueBool):
question = u'%s (%s/%s)' % (question, 'Y' if v.default else 'y', 'n' if v.default else 'N')
elif v.choices:
tiny = True
for key in v.choices.iterkeys():
if len(key) > 5 or ' ' in key:
tiny = False
break
if tiny:
question = u'%s (%s)' % (question, '/'.join((s.upper() if s == v.default else s)
for s in (v.choices.iterkeys())))
else:
for n, (key, value) in enumerate(v.choices.iteritems()):
print '%s%2d)%s %s' % (self.BOLD, n + 1, self.NC, value)
aliases[str(n + 1)] = key
question = u'%s (choose in list)' % question
elif default not in (None, '') and not v.masked:
question = u'%s [%s]' % (question, v.default)
if v.masked:
question = u'%s (hidden input)' % question
question += ': '
while True:
if v.masked:
line = getpass.getpass(question)
else:
self.stdout.write(question)
self.stdout.flush()
line = self.stdin.readline()
if len(line) == 0:
raise EOFError()
else:
line = line.rstrip('\r\n')
if not line and v.default is not None:
line = v.default
if isinstance(line, str):
line = line.decode('utf-8')
if line in aliases:
line = aliases[line]
try:
v.set_value(line)
except ValueError, e:
print 'Error: %s' % e
else:
break
return v.value
# formatting related methods
backend_name = line.strip()
if not backend_name:
print 'Please specify a backend name.'
return
backends = set(backend for backend in self.enabled_backends if backend.name == backend_name)
if not backends:
print 'No backend found for "%s"' % backend_name
return
backend = backends.pop()
if not hasattr(backend, '_browser'):
print 'No browser created for backend "%s" yet. Please invoke a command before.' % backend.name
return
browser = backend._browser
data = browser.parser.tostring(browser.page.document)
try:
from webkit_mechanize_browser.browser import Browser
from weboob.tools.inspect import Page
except ImportError:
print data
else:
page = Page(core=browser, data=data, uri=browser._response.geturl())
browser = Browser(view=page.view)
# -- formatting related methods -------------
def set_formatter(self, name):
"""
Set the current formatter from name.
@ -1097,69 +780,3 @@ class ReplApplication(Cmd, BaseApplication):
def flush(self):
self.formatter.flush()
def parse_id(self, _id, unique_backend=False):
try:
_id, backend_name = _id.rsplit('@', 1)
except ValueError:
backend_name = None
if unique_backend and not backend_name:
backends = []
for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()):
if self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__):
continue
backends.append((name, backend))
if self.interactive:
while not backend_name:
print 'This command works with a unique backend. Availables:'
for index, (name, backend) in enumerate(backends):
print '%s%d)%s %s%-15s%s %s' % (self.BOLD, index + 1, self.NC, self.BOLD, name, self.NC,
backend.description)
response = self.ask('Select a backend to proceed', regexp='^\d+$')
if response.isdigit():
i = int(response) - 1
if i < 0 or i >= len(backends):
print 'Error: %s is not a valid choice' % response
continue
backend_name = backends[i][0]
else:
raise BackendNotGiven('Please specify a backend to use for this argument (%s@backend_name). '
'Availables: %s.' % (_id, ', '.join(name for name, backend in backends)))
return _id, backend_name
def do_inspect(self, line):
"""
inspect BACKEND_NAME
Display the HTML string of the current page of the specified backend's browser.
If webkit_mechanize_browser Python module is installed, HTML is displayed in a WebKit GUI.
"""
if len(self.enabled_backends) == 1:
backend = list(self.enabled_backends)[0]
else:
backend_name = line.strip()
if not backend_name:
print 'Please specify a backend name.'
return
backends = set(backend for backend in self.enabled_backends if backend.name == backend_name)
if not backends:
print 'No backend found for "%s"' % backend_name
return
backend = backends.pop()
if not hasattr(backend, '_browser'):
print 'No browser created for backend "%s" yet. Please invoke a command before.' % backend.name
return
browser = backend._browser
data = browser.parser.tostring(browser.page.document)
try:
from webkit_mechanize_browser.browser import Browser
from weboob.tools.inspect import Page
except ImportError:
print data
else:
page = Page(core=browser, data=data, uri=browser._response.geturl())
browser = Browser(view=page.view)
def complete_inspect(self, text, line, begidx, endidx):
return sorted(set(backend.name for backend in self.enabled_backends))