From 2ba3f46f3ba8a6b85c280fa26790174e0d526126 Mon Sep 17 00:00:00 2001 From: Romain Bignon Date: Sat, 5 Mar 2011 21:55:20 +0100 Subject: [PATCH] split ReplApplication to create ConsoleApplication --- weboob/tools/application/base.py | 8 +- weboob/tools/application/console.py | 430 +++++++++++++++ weboob/tools/application/repl.py | 805 ++++++++-------------------- 3 files changed, 648 insertions(+), 595 deletions(-) create mode 100644 weboob/tools/application/console.py diff --git a/weboob/tools/application/base.py b/weboob/tools/application/base.py index 5cb8d4e5..d4e193c5 100644 --- a/weboob/tools/application/base.py +++ b/weboob/tools/application/base.py @@ -25,6 +25,7 @@ import tempfile from weboob.capabilities.base import NotAvailable, NotLoaded from weboob.core import Weboob, CallErrors +from weboob.core.backendscfg import BackendsConfig from weboob.tools.config.iconfig import ConfigError from weboob.tools.backend import ObjectNotAvailable from weboob.tools.log import createColoredFormatter, getLogger @@ -352,7 +353,12 @@ class BaseApplication(object): if args is None: args = [(sys.stdin.encoding and arg.decode(sys.stdin.encoding) or arg) for arg in sys.argv] - app = klass() + + try: + app = klass() + except BackendsConfig.WrongPermissions, e: + print e + sys.exit(1) try: try: diff --git a/weboob/tools/application/console.py b/weboob/tools/application/console.py new file mode 100644 index 00000000..68b4ad9b --- /dev/null +++ b/weboob/tools/application/console.py @@ -0,0 +1,430 @@ +# -*- coding: utf-8 -*- + +# Copyright(C) 2010-2011 Christophe Benz, Romain Bignon +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, version 3 of the License. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + + +from copy import deepcopy +import getpass +import logging +import sys + +from weboob.capabilities.account import ICapAccount, Account, AccountRegisterError +from weboob.core.backendscfg import BackendAlreadyExists +from weboob.core.modules import ModuleLoadError +from weboob.tools.browser import BrowserUnavailable, BrowserIncorrectPassword +from weboob.tools.value import Value, ValueBool, ValueFloat, ValueInt + +from .base import BackendNotFound, BaseApplication + +class BackendNotGiven(Exception): + pass + +class ConsoleApplication(BaseApplication): + """ + Base application class for CLI applications. + """ + + CAPS = None + + # shell escape strings + BOLD = '' + NC = '' # no color + + stdin = sys.stdin + stdout = sys.stdout + + def __init__(self, option_parser=None): + BaseApplication.__init__(self, option_parser) + self.enabled_backends = set() + + def unload_backends(self, *args, **kwargs): + unloaded = self.weboob.unload_backends(*args, **kwargs) + for backend in unloaded.itervalues(): + try: + self.enabled_backends.remove(backend) + except KeyError: + pass + return unloaded + + def is_backend_loadable(self, backend): + return self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__) + + def load_backends(self, *args, **kwargs): + if 'errors' in kwargs: + errors = kwargs['errors'] + else: + kwargs['errors'] = errors = [] + ret = super(ConsoleApplication, self).load_backends(*args, **kwargs) + + for err in errors: + print >>sys.stderr, 'Error(%s): %s' % (err.backend_name, err) + if self.ask('Do you want to reconfigure this backend?', default=True): + self.edit_backend(err.backend_name) + self.load_backends(names=[err.backend_name]) + + for name, backend in ret.iteritems(): + self.enabled_backends.add(backend) + while len(self.enabled_backends) == 0: + print 'Warning: there is currently no configured backend for %s' % self.APPNAME + if not self.ask('Do you want to configure backends?', default=True): + break + + self.weboob.modules_loader.load_all() + r = '' + while r != 'q': + backends = [] + print '\nAvailable backends:' + for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()): + if not self.is_backend_loadable(backend): + continue + backends.append(name) + loaded = ' ' + for bi in self.weboob.iter_backends(): + if bi.NAME == name: + if loaded == ' ': + loaded = 'X' + elif loaded == 'X': + loaded = 2 + else: + loaded += 1 + print '%s%d)%s [%s] %s%-15s%s %s' % (self.BOLD, len(backends), self.NC, loaded, + self.BOLD, name, self.NC, backend.description) + print '%sq)%s --stop--\n' % (self.BOLD, self.NC) + r = self.ask('Select a backend to add (q to stop)', regexp='^(\d+|q)$') + + if r.isdigit(): + i = int(r) - 1 + if i < 0 or i >= len(backends): + print 'Error: %s is not a valid choice' % r + continue + name = backends[i] + try: + inst = self.add_backend(name) + if inst: + self.load_backends(names=[inst]) + except (KeyboardInterrupt, EOFError): + print '\nAborted.' + + print 'Right right!' + + return ret + + def _handle_options(self): + self.load_default_backends() + + def load_default_backends(self): + """ + By default loads all backends. + + Applications can overload this method to restrict backends loaded. + """ + self.load_backends(self.CAPS) + + @classmethod + def run(klass, args=None): + try: + super(ConsoleApplication, klass).run(args) + except BackendNotFound, e: + logging.error(e) + + def do(self, function, *args, **kwargs): + if not 'backends' in kwargs: + kwargs['backends'] = self.enabled_backends + return self.weboob.do(function, *args, **kwargs) + + def parse_id(self, _id, unique_backend=False): + try: + _id, backend_name = _id.rsplit('@', 1) + except ValueError: + backend_name = None + if unique_backend and not backend_name: + backends = [] + for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()): + if self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__): + continue + backends.append((name, backend)) + if self.interactive: + while not backend_name: + print 'This command works with a unique backend. Availables:' + for index, (name, backend) in enumerate(backends): + print '%s%d)%s %s%-15s%s %s' % (self.BOLD, index + 1, self.NC, self.BOLD, name, self.NC, + backend.description) + response = self.ask('Select a backend to proceed', regexp='^\d+$') + if response.isdigit(): + i = int(response) - 1 + if i < 0 or i >= len(backends): + print 'Error: %s is not a valid choice' % response + continue + backend_name = backends[i][0] + else: + raise BackendNotGiven('Please specify a backend to use for this argument (%s@backend_name). ' + 'Availables: %s.' % (_id, ', '.join(name for name, backend in backends))) + return _id, backend_name + + def caps_included(self, modcaps, caps): + modcaps = [x.__name__ for x in modcaps] + if not isinstance(caps, (list,set,tuple)): + caps = (caps,) + for cap in caps: + if not cap in modcaps: + return False + return True + + # user interaction related methods + + def register_backend(self, name, ask_add=True): + try: + backend = self.weboob.modules_loader.get_or_load_module(name) + except ModuleLoadError, e: + backend = None + + if not backend: + print 'Backend "%s" does not exist.' % name + return None + + if not backend.has_caps(ICapAccount) or backend.klass.ACCOUNT_REGISTER_PROPERTIES is None: + print 'You can\'t register a new account with %s' % name + return None + + account = Account() + account.properties = {} + if backend.website: + website = 'on website %s' % backend.website + else: + website = 'with backend %s' % backend.name + while 1: + asked_config = False + for key, prop in backend.klass.ACCOUNT_REGISTER_PROPERTIES.iteritems(): + if not asked_config: + asked_config = True + print 'Configuration of new account %s' % website + print '-----------------------------%s' % ('-' * len(website)) + p = deepcopy(prop) + p.set_value(self.ask(prop, default=account.properties[key].value if (key in account.properties) else prop.default)) + account.properties[key] = p + if asked_config: + print '-----------------------------%s' % ('-' * len(website)) + try: + backend.klass.register_account(account) + except AccountRegisterError, e: + print u'%s' % e + if self.ask('Do you want to try again?', default=True): + continue + else: + return None + else: + break + backend_config = {} + for key, value in account.properties.iteritems(): + if key in backend.config: + backend_config[key] = value.value + + if ask_add and self.ask('Do you want to add the new register account?', default=True): + return self.add_backend(name, backend_config, ask_register=False) + + return backend_config + + def edit_backend(self, name, params=None): + return self.add_backend(name, params, True) + + def add_backend(self, name, params=None, edit=False, ask_register=True): + if params is None: + params = {} + + if not edit: + try: + backend = self.weboob.modules_loader.get_or_load_module(name) + except ModuleLoadError: + backend = None + else: + bname, items = self.weboob.backends_config.get_backend(name) + try: + backend = self.weboob.modules_loader.get_or_load_module(bname) + except ModuleLoadError: + backend = None + items.update(params) + params = items + if not backend: + print 'Backend "%s" does not exist. Hint: use the "backends" command.' % name + return None + + # ask for params non-specified on command-line arguments + asked_config = False + for key, value in backend.config.iteritems(): + if not asked_config: + asked_config = True + print 'Configuration of backend' + print '------------------------' + if key not in params or edit: + params[key] = self.ask(value, default=params[key] if (key in params) else value.default) + else: + print u' [%s] %s: %s' % (key, value.description, '(masked)' if value.masked else params[key]) + if asked_config: + print '------------------------' + + try: + self.weboob.backends_config.add_backend(name, name, params, edit=edit) + print 'Backend "%s" successfully %s.' % (name, 'updated' if edit else 'added') + return name + except BackendAlreadyExists: + print 'Backend "%s" is already configured in file "%s"' % (name, self.weboob.backends_config.confpath) + while self.ask('Add new instance of "%s" backend?' % name, default=False): + new_name = self.ask('Please give new instance name (could be "%s_1")' % name, regexp=r'^[\w\-_]+$') + try: + self.weboob.backends_config.add_backend(new_name, name, params) + print 'Backend "%s" successfully added.' % new_name + return new_name + except BackendAlreadyExists: + print 'Instance "%s" already exists for backend "%s".' % (new_name, name) + + def ask(self, question, default=None, masked=False, regexp=None, choices=None): + """ + Ask a question to user. + + @param question text displayed (str) + @param default optional default value (str) + @param masked if True, do not show typed text (bool) + @param regexp text must match this regexp (str) + @param choices choices to do (list) + @return entered text by user (str) + """ + + if isinstance(question, Value): + v = deepcopy(question) + if default: + v.default = default + if masked: + v.masked = masked + if regexp: + v.regexp = regexp + if choices: + v.choices = choices + else: + if isinstance(default, bool): + klass = ValueBool + elif isinstance(default, float): + klass = ValueFloat + elif isinstance(default, (int,long)): + klass = ValueInt + else: + klass = Value + + v = klass(label=question, default=default, masked=masked, regexp=regexp, choices=choices) + + question = v.label + if v.id: + question = u'[%s] %s' % (v.id, question) + + aliases = {} + if isinstance(v, ValueBool): + question = u'%s (%s/%s)' % (question, 'Y' if v.default else 'y', 'n' if v.default else 'N') + elif v.choices: + tiny = True + for key in v.choices.iterkeys(): + if len(key) > 5 or ' ' in key: + tiny = False + break + + if tiny: + question = u'%s (%s)' % (question, '/'.join((s.upper() if s == v.default else s) + for s in (v.choices.iterkeys()))) + else: + for n, (key, value) in enumerate(v.choices.iteritems()): + print '%s%2d)%s %s' % (self.BOLD, n + 1, self.NC, value) + aliases[str(n + 1)] = key + question = u'%s (choose in list)' % question + elif default not in (None, '') and not v.masked: + question = u'%s [%s]' % (question, v.default) + + if v.masked: + question = u'%s (hidden input)' % question + + question += ': ' + + while True: + if v.masked: + line = getpass.getpass(question) + else: + self.stdout.write(question) + self.stdout.flush() + line = self.stdin.readline() + if len(line) == 0: + raise EOFError() + else: + line = line.rstrip('\r\n') + + if not line and v.default is not None: + line = v.default + if isinstance(line, str): + line = line.decode('utf-8') + + if line in aliases: + line = aliases[line] + + try: + v.set_value(line) + except ValueError, e: + print 'Error: %s' % e + else: + break + + return v.value + + def bcall_error_handler(self, backend, error, backtrace): + """ + Handler for an exception inside the CallErrors exception. + + This method can be overrided to support more exceptions types. + """ + if isinstance(error, BrowserIncorrectPassword): + msg = unicode(error) + if not msg: + msg = 'invalid login/password.' + print >>sys.stderr, 'Error(%s): %s' % (backend.name, msg) + if self.ask('Do you want to reconfigure this backend?', default=True): + self.unload_backends(names=[backend.name]) + self.edit_backend(backend.name) + self.load_backends(names=[backend.name]) + elif isinstance(error, BrowserUnavailable): + msg = unicode(error) + if not msg: + msg = 'website is unavailable.' + print >>sys.stderr, u'Error(%s): %s' % (backend.name, msg) + elif isinstance(error, NotImplementedError): + print >>sys.stderr, u'Error(%s): this feature is not supported yet by this backend.' % backend.name + print >>sys.stderr, u' %s To help the maintainer of this backend implement this feature,' % (' ' * len(backend.name)) + print >>sys.stderr, u' %s please contact: %s <%s>' % (' ' * len(backend.name), backend.MAINTAINER, backend.EMAIL) + else: + print >>sys.stderr, u'Error(%s): %s' % (backend.name, error) + if logging.root.level == logging.DEBUG: + print >>sys.stderr, backtrace + else: + return True + + def bcall_errors_handler(self, errors): + """ + Handler for the CallErrors exception. + """ + ask_debug_mode = False + for backend, error, backtrace in errors.errors: + if self.bcall_error_handler(backend, error, backtrace): + ask_debug_mode = True + + if ask_debug_mode: + if self.interactive: + print >>sys.stderr, 'Use "logging debug" option to print backtraces.' + else: + print >>sys.stderr, 'Use --debug option to print backtraces.' diff --git a/weboob/tools/application/repl.py b/weboob/tools/application/repl.py index 08ab6fa0..447840eb 100644 --- a/weboob/tools/application/repl.py +++ b/weboob/tools/application/repl.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright(C) 2010 Christophe Benz, Romain Bignon +# Copyright(C) 2010-2011 Christophe Benz, Romain Bignon # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -18,25 +18,19 @@ import atexit from cmd import Cmd -import getpass import logging from optparse import OptionGroup, OptionParser, IndentedHelpFormatter import os import sys -from copy import deepcopy -from weboob.capabilities.account import ICapAccount, Account, AccountRegisterError from weboob.capabilities.base import FieldNotFound from weboob.core import CallErrors from weboob.core.modules import ModuleLoadError -from weboob.core.backendscfg import BackendsConfig, BackendAlreadyExists from weboob.tools.application.formatters.iformatter import MandatoryFieldsNotFound -from weboob.tools.browser import BrowserUnavailable, BrowserIncorrectPassword -from weboob.tools.value import Value, ValueBool, ValueFloat, ValueInt from weboob.tools.misc import to_unicode from weboob.tools.ordereddict import OrderedDict -from .base import BackendNotFound, BaseApplication +from .console import BackendNotGiven, ConsoleApplication from .formatters.load import FormattersLoader, FormatterLoadError from .results import ResultsCondition, ResultsConditionError @@ -44,17 +38,9 @@ from .results import ResultsCondition, ResultsConditionError __all__ = ['ReplApplication', 'NotEnoughArguments'] -class BackendNotGiven(Exception): - pass - - class NotEnoughArguments(Exception): pass -class OutputIsNone(Exception): - pass - - class ReplOptionParser(OptionParser): def format_option_help(self, formatter=None): if not formatter: @@ -75,20 +61,15 @@ class ReplOptionFormatter(IndentedHelpFormatter): s += ' %s\n' % c return s -class ReplApplication(Cmd, BaseApplication): +class ReplApplication(Cmd, ConsoleApplication): """ - Base application class for CLI applications. + Base application class for Repl applications. """ SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] [command [arguments..]]\n' SYNOPSIS += ' %prog [--help] [--version]' - CAPS = None DISABLE_REPL = False - # shell escape strings - BOLD = '' - NC = '' # no color - EXTRA_FORMATTERS = {} DEFAULT_FORMATTER = 'multiline' COMMANDS_FORMATTERS = {} @@ -121,11 +102,7 @@ class ReplApplication(Cmd, BaseApplication): self.formatter = None self.commands_formatters = self.COMMANDS_FORMATTERS.copy() - try: - BaseApplication.__init__(self, ReplOptionParser(self.SYNOPSIS, version=self._get_optparse_version())) - except BackendsConfig.WrongPermissions, e: - print e - sys.exit(1) + ConsoleApplication.__init__(self, ReplOptionParser(self.SYNOPSIS, version=self._get_optparse_version())) commands_help = self.get_commands_doc() self._parser.commands = commands_help @@ -148,306 +125,11 @@ class ReplApplication(Cmd, BaseApplication): self._parser.add_option_group(formatting_options) self._interactive = False - self.enabled_backends = set() @property def interactive(self): return self._interactive - def caps_included(self, modcaps, caps): - modcaps = [x.__name__ for x in modcaps] - if not isinstance(caps, (list,set,tuple)): - caps = (caps,) - for cap in caps: - if not cap in modcaps: - return False - return True - - def register_backend(self, name, ask_add=True): - try: - backend = self.weboob.modules_loader.get_or_load_module(name) - except ModuleLoadError, e: - backend = None - - if not backend: - print 'Backend "%s" does not exist.' % name - return None - - if not backend.has_caps(ICapAccount) or backend.klass.ACCOUNT_REGISTER_PROPERTIES is None: - print 'You can\'t register a new account with %s' % name - return None - - account = Account() - account.properties = {} - if backend.website: - website = 'on website %s' % backend.website - else: - website = 'with backend %s' % backend.name - while 1: - asked_config = False - for key, prop in backend.klass.ACCOUNT_REGISTER_PROPERTIES.iteritems(): - if not asked_config: - asked_config = True - print 'Configuration of new account %s' % website - print '-----------------------------%s' % ('-' * len(website)) - p = deepcopy(prop) - p.set_value(self.ask(prop, default=account.properties[key].value if (key in account.properties) else prop.default)) - account.properties[key] = p - if asked_config: - print '-----------------------------%s' % ('-' * len(website)) - try: - backend.klass.register_account(account) - except AccountRegisterError, e: - print u'%s' % e - if self.ask('Do you want to try again?', default=True): - continue - else: - return None - else: - break - backend_config = {} - for key, value in account.properties.iteritems(): - if key in backend.config: - backend_config[key] = value.value - - if ask_add and self.ask('Do you want to add the new register account?', default=True): - return self.add_backend(name, backend_config, ask_register=False) - - return backend_config - - def edit_backend(self, name, params=None): - return self.add_backend(name, params, True) - - def add_backend(self, name, params=None, edit=False, ask_register=True): - if params is None: - params = {} - - if not edit: - try: - backend = self.weboob.modules_loader.get_or_load_module(name) - except ModuleLoadError: - backend = None - else: - bname, items = self.weboob.backends_config.get_backend(name) - try: - backend = self.weboob.modules_loader.get_or_load_module(bname) - except ModuleLoadError: - backend = None - items.update(params) - params = items - if not backend: - print 'Backend "%s" does not exist. Hint: use the "backends" command.' % name - return None - - # ask for params non-specified on command-line arguments - asked_config = False - for key, value in backend.config.iteritems(): - if not asked_config: - asked_config = True - print 'Configuration of backend' - print '------------------------' - if key not in params or edit: - params[key] = self.ask(value, default=params[key] if (key in params) else value.default) - else: - print u' [%s] %s: %s' % (key, value.description, '(masked)' if value.masked else params[key]) - if asked_config: - print '------------------------' - - try: - self.weboob.backends_config.add_backend(name, name, params, edit=edit) - print 'Backend "%s" successfully %s.' % (name, 'updated' if edit else 'added') - return name - except BackendAlreadyExists: - print 'Backend "%s" is already configured in file "%s"' % (name, self.weboob.backends_config.confpath) - while self.ask('Add new instance of "%s" backend?' % name, default=False): - new_name = self.ask('Please give new instance name (could be "%s_1")' % name, regexp=r'^[\w\-_]+$') - try: - self.weboob.backends_config.add_backend(new_name, name, params) - print 'Backend "%s" successfully added.' % new_name - return new_name - except BackendAlreadyExists: - print 'Instance "%s" already exists for backend "%s".' % (new_name, name) - - def unload_backends(self, *args, **kwargs): - unloaded = self.weboob.unload_backends(*args, **kwargs) - for backend in unloaded.itervalues(): - try: - self.enabled_backends.remove(backend) - except KeyError: - pass - return unloaded - - def load_backends(self, *args, **kwargs): - if 'errors' in kwargs: - errors = kwargs['errors'] - else: - kwargs['errors'] = errors = [] - ret = super(ReplApplication, self).load_backends(*args, **kwargs) - - for err in errors: - print >>sys.stderr, 'Error(%s): %s' % (err.backend_name, err) - if self.ask('Do you want to reconfigure this backend?', default=True): - self.edit_backend(err.backend_name) - self.load_backends(names=[err.backend_name]) - - for name, backend in ret.iteritems(): - self.enabled_backends.add(backend) - while len(self.enabled_backends) == 0: - print 'Warning: there is currently no configured backend for %s' % self.APPNAME - if not self.ask('Do you want to configure backends?', default=True): - break - - self.weboob.modules_loader.load_all() - r = '' - while r != 'q': - backends = [] - print '\nAvailable backends:' - for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()): - if self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__): - continue - backends.append(name) - loaded = ' ' - for bi in self.weboob.iter_backends(): - if bi.NAME == name: - if loaded == ' ': - loaded = 'X' - elif loaded == 'X': - loaded = 2 - else: - loaded += 1 - print '%s%d)%s [%s] %s%-15s%s %s' % (self.BOLD, len(backends), self.NC, loaded, - self.BOLD, name, self.NC, backend.description) - print '%sq)%s --stop--\n' % (self.BOLD, self.NC) - r = self.ask('Select a backend to add (q to stop)', regexp='^(\d+|q)$') - - if r.isdigit(): - i = int(r) - 1 - if i < 0 or i >= len(backends): - print 'Error: %s is not a valid choice' % r - continue - name = backends[i] - try: - inst = self.add_backend(name) - if inst: - self.load_backends(names=[inst]) - except (KeyboardInterrupt, EOFError): - print '\nAborted.' - - print 'Right right!' - - return ret - - def load_default_backends(self): - """ - By default loads all backends. - - Applications can overload this method to restrict backends loaded. - """ - self.load_backends(self.CAPS) - - @classmethod - def run(klass, args=None): - try: - super(ReplApplication, klass).run(args) - except BackendNotFound, e: - logging.error(e) - - def parse_command_args(self, line, nb, req_n=None): - if line.strip() == '': - # because ''.split() = [''] - args = [] - else: - args = line.strip().split(' ', nb - 1) - if req_n is not None and (len(args) < req_n): - raise NotEnoughArguments('Command needs %d arguments' % req_n) - - if len(args) < nb: - args += tuple(None for i in xrange(nb - len(args))) - return args - - def postcmd(self, stop, line): - """ - This REPL method is overrided to return None instead of integers - to prevent stopping cmdloop(). - """ - if not isinstance(stop, bool): - stop = None - return stop - - def bcall_error_handler(self, backend, error, backtrace): - """ - Handler for an exception inside the CallErrors exception. - - This method can be overrided to support more exceptions types. - """ - if isinstance(error, BrowserIncorrectPassword): - msg = unicode(error) - if not msg: - msg = 'invalid login/password.' - print >>sys.stderr, 'Error(%s): %s' % (backend.name, msg) - if self.ask('Do you want to reconfigure this backend?', default=True): - self.unload_backends(names=[backend.name]) - self.edit_backend(backend.name) - self.load_backends(names=[backend.name]) - elif isinstance(error, BrowserUnavailable): - msg = unicode(error) - if not msg: - msg = 'website is unavailable.' - print >>sys.stderr, u'Error(%s): %s' % (backend.name, msg) - elif isinstance(error, ResultsConditionError): - print >>sys.stderr, u'Error(%s): condition error: %s' % (backend.name, error) - elif isinstance(error, NotImplementedError): - print >>sys.stderr, u'Error(%s): this feature is not supported yet by this backend.' % backend.name - print >>sys.stderr, u' %s To help the maintainer of this backend implement this feature,' % (' ' * len(backend.name)) - print >>sys.stderr, u' %s please contact: %s <%s>' % (' ' * len(backend.name), backend.MAINTAINER, backend.EMAIL) - else: - print >>sys.stderr, u'Error(%s): %s' % (backend.name, error) - if logging.root.level == logging.DEBUG: - print >>sys.stderr, backtrace - else: - return True - - def bcall_errors_handler(self, errors): - """ - Handler for the CallErrors exception. - """ - ask_debug_mode = False - for backend, error, backtrace in errors.errors: - if self.bcall_error_handler(backend, error, backtrace): - ask_debug_mode = True - - if ask_debug_mode: - if self.interactive: - print >>sys.stderr, 'Use "logging debug" option to print backtraces.' - else: - print >>sys.stderr, 'Use --debug option to print backtraces.' - - def onecmd(self, line): - """ - This REPL method is overrided to catch some particular exceptions. - """ - line = to_unicode(line) - cmd, arg, ignored = self.parseline(line) - - # Set the right formatter for the command. - try: - formatter_name = self.commands_formatters[cmd] - except KeyError: - formatter_name = self.DEFAULT_FORMATTER - self.set_formatter(formatter_name) - - try: - return super(ReplApplication, self).onecmd(line) - except CallErrors, e: - self.bcall_errors_handler(e) - except BackendNotGiven, e: - print >>sys.stderr, 'Error: %s' % str(e) - except NotEnoughArguments, e: - print >>sys.stderr, 'Error: not enough arguments. %s' % str(e) - except (KeyboardInterrupt, EOFError): - # ^C during a command process doesn't exit application. - print '\nAborted.' - def main(self, argv): cmd_args = argv[1:] if cmd_args: @@ -496,98 +178,55 @@ class ReplApplication(Cmd, BaseApplication): fields = None return self.weboob.do(self._do_complete, self.options.count, fields, function, *args, **kwargs) - # options related methods - - def _handle_options(self): - if self.options.formatter: - self.commands_formatters = {} - self.DEFAULT_FORMATTER = self.options.formatter - self.set_formatter(self.DEFAULT_FORMATTER) - - if self.options.select: - self.selected_fields = self.options.select.split(',') + # -- command tools ------------ + def parse_command_args(self, line, nb, req_n=None): + if line.strip() == '': + # because ''.split() = [''] + args = [] else: - self.selected_fields = '$direct' + args = line.strip().split(' ', nb - 1) + if req_n is not None and (len(args) < req_n): + raise NotEnoughArguments('Command needs %d arguments' % req_n) - if self.options.condition: - self.condition = ResultsCondition(self.options.condition) - else: - self.condition = None + if len(args) < nb: + args += tuple(None for i in xrange(nb - len(args))) + return args - if self.options.count == 0: - self._parser.error('Count must be at least 1, or negative for infinite') - elif self.options.count < 0: - # infinite search - self.options.count = None - - self.load_default_backends() - - # default REPL commands - - def do_quit(self, arg): + # -- cmd.Cmd methods --------- + def postcmd(self, stop, line): """ - Quit the application. + This REPL method is overrided to return None instead of integers + to prevent stopping cmdloop(). """ - return True + if not isinstance(stop, bool): + stop = None + return stop - def do_EOF(self, arg): + def onecmd(self, line): """ - Quit the command line interpreter when ^D is pressed. + This REPL method is overrided to catch some particular exceptions. """ - # print empty line for the next shell prompt to appear on the first column of the terminal - print - return self.do_quit(arg) + line = to_unicode(line) + cmd, arg, ignored = self.parseline(line) - def get_command_help(self, command, short=False): + # Set the right formatter for the command. try: - doc = getattr(self, 'do_' + command).__doc__ - except AttributeError: - return None - if doc: - doc = '\n'.join(line.strip() for line in doc.strip().split('\n')) - if not doc.startswith(command): - doc = '%s\n\n%s' % (command, doc) - if short: - doc = doc.split('\n')[0] - if not doc.startswith(command): - doc = command + formatter_name = self.commands_formatters[cmd] + except KeyError: + formatter_name = self.DEFAULT_FORMATTER + self.set_formatter(formatter_name) - return doc - - def get_commands_doc(self): - names = set(name for name in self.get_names() if name.startswith('do_')) - appname = self.APPNAME.capitalize() - d = OrderedDict(((appname, []), ('Weboob', []))) - - for name in sorted(names): - cmd = name[3:] - if cmd in self.hidden_commands.union(self.weboob_commands).union(['help']): - continue - elif getattr(self, name).__doc__: - d[appname].append(self.get_command_help(cmd)) - else: - d[appname].append(cmd) - if not self.DISABLE_REPL: - for cmd in self.weboob_commands: - d['Weboob'].append(self.get_command_help(cmd)) - - return d - - def do_help(self, arg=None): - if arg: - cmd_names = set(name[3:] for name in self.get_names() if name.startswith('do_')) - if arg in cmd_names: - command_help = self.get_command_help(arg) - if command_help is None: - logging.warning(u'Command "%s" is undocumented' % arg) - else: - self.stdout.write('%s\n' % command_help) - else: - print 'Unknown command: "%s"' % arg - else: - cmds = self._parser.formatter.format_commands(self._parser.commands) - self.stdout.write('%s\n' % cmds) - self.stdout.write('Type "help " for more info about a command.\n') + try: + return super(ReplApplication, self).onecmd(line) + except CallErrors, e: + self.bcall_errors_handler(e) + except BackendNotGiven, e: + print >>sys.stderr, 'Error: %s' % str(e) + except NotEnoughArguments, e: + print >>sys.stderr, 'Error: not enough arguments. %s' % str(e) + except (KeyboardInterrupt, EOFError): + # ^C during a command process doesn't exit application. + print '\nAborted.' def emptyline(self): """ @@ -641,6 +280,138 @@ class ReplApplication(Cmd, BaseApplication): return '%s ' % match return match + # -- errors management ------------- + def bcall_error_handler(self, backend, error, backtrace): + """ + Handler for an exception inside the CallErrors exception. + + This method can be overrided to support more exceptions types. + """ + if isinstance(error, ResultsConditionError): + print >>sys.stderr, u'Error(%s): condition error: %s' % (backend.name, error) + else: + return super(ReplApplication, self).bcall_error_handler(self, backend, error, backtrace) + + # -- options related methods ------------- + def _handle_options(self): + if self.options.formatter: + self.commands_formatters = {} + self.DEFAULT_FORMATTER = self.options.formatter + self.set_formatter(self.DEFAULT_FORMATTER) + + if self.options.select: + self.selected_fields = self.options.select.split(',') + else: + self.selected_fields = '$direct' + + if self.options.condition: + self.condition = ResultsCondition(self.options.condition) + else: + self.condition = None + + if self.options.count == 0: + self._parser.error('Count must be at least 1, or negative for infinite') + elif self.options.count < 0: + # infinite search + self.options.count = None + + return super(ReplApplication, self)._handle_options() + + def get_command_help(self, command, short=False): + try: + doc = getattr(self, 'do_' + command).__doc__ + except AttributeError: + return None + if doc: + doc = '\n'.join(line.strip() for line in doc.strip().split('\n')) + if not doc.startswith(command): + doc = '%s\n\n%s' % (command, doc) + if short: + doc = doc.split('\n')[0] + if not doc.startswith(command): + doc = command + + return doc + + def get_commands_doc(self): + names = set(name for name in self.get_names() if name.startswith('do_')) + appname = self.APPNAME.capitalize() + d = OrderedDict(((appname, []), ('Weboob', []))) + + for name in sorted(names): + cmd = name[3:] + if cmd in self.hidden_commands.union(self.weboob_commands).union(['help']): + continue + elif getattr(self, name).__doc__: + d[appname].append(self.get_command_help(cmd)) + else: + d[appname].append(cmd) + if not self.DISABLE_REPL: + for cmd in self.weboob_commands: + d['Weboob'].append(self.get_command_help(cmd)) + + return d + + # -- default REPL commands --------- + + def do_quit(self, arg): + """ + Quit the application. + """ + return True + + def do_EOF(self, arg): + """ + Quit the command line interpreter when ^D is pressed. + """ + # print empty line for the next shell prompt to appear on the first column of the terminal + print + return self.do_quit(arg) + + def do_help(self, arg=None): + if arg: + cmd_names = set(name[3:] for name in self.get_names() if name.startswith('do_')) + if arg in cmd_names: + command_help = self.get_command_help(arg) + if command_help is None: + logging.warning(u'Command "%s" is undocumented' % arg) + else: + self.stdout.write('%s\n' % command_help) + else: + print 'Unknown command: "%s"' % arg + else: + cmds = self._parser.formatter.format_commands(self._parser.commands) + self.stdout.write('%s\n' % cmds) + self.stdout.write('Type "help " for more info about a command.\n') + + def complete_backends(self, text, line, begidx, endidx): + choices = [] + commands = ['enable', 'disable', 'only', 'list', 'add', 'register', 'edit', 'remove'] + available_backends_names = set(backend.name for backend in self.weboob.iter_backends()) + enabled_backends_names = set(backend.name for backend in self.enabled_backends) + + args = line.split(' ') + if len(args) == 2: + choices = commands + elif len(args) >= 3: + if args[1] == 'enable': + choices = sorted(available_backends_names - enabled_backends_names) + elif args[1] == 'only': + choices = sorted(available_backends_names) + elif args[1] == 'disable': + choices = sorted(enabled_backends_names) + elif args[1] in ('add', 'register') and len(args) == 3: + self.weboob.modules_loader.load_all() + for name, module in sorted(self.weboob.modules_loader.loaded.iteritems()): + if not self.CAPS or self.caps_included(module.iter_caps(), self.CAPS.__name__): + choices.append(name) + elif args[1] == 'edit': + choices = sorted(available_backends_names) + elif args[1] == 'remove': + choices = sorted(available_backends_names) + + return choices + def do_backends(self, line): """ backends [ACTION] [BACKEND_NAME]... @@ -737,33 +508,12 @@ class ReplApplication(Cmd, BaseApplication): if len(self.enabled_backends) == 0: print 'Warning: no more backends are loaded. %s is probably unusable.' % self.APPNAME.capitalize() - def complete_backends(self, text, line, begidx, endidx): - choices = [] - commands = ['enable', 'disable', 'only', 'list', 'add', 'register', 'edit', 'remove'] - available_backends_names = set(backend.name for backend in self.weboob.iter_backends()) - enabled_backends_names = set(backend.name for backend in self.enabled_backends) - + def complete_logging(self, text, line, begidx, endidx): + levels = ('debug', 'info', 'warning', 'error', 'quiet', 'default') args = line.split(' ') if len(args) == 2: - choices = commands - elif len(args) >= 3: - if args[1] == 'enable': - choices = sorted(available_backends_names - enabled_backends_names) - elif args[1] == 'only': - choices = sorted(available_backends_names) - elif args[1] == 'disable': - choices = sorted(enabled_backends_names) - elif args[1] in ('add', 'register') and len(args) == 3: - self.weboob.modules_loader.load_all() - for name, module in sorted(self.weboob.modules_loader.loaded.iteritems()): - if not self.CAPS or self.caps_included(module.iter_caps(), self.CAPS.__name__): - choices.append(name) - elif args[1] == 'edit': - choices = sorted(available_backends_names) - elif args[1] == 'remove': - choices = sorted(available_backends_names) - - return choices + return levels + return () def do_logging(self, line): """ @@ -804,13 +554,6 @@ class ReplApplication(Cmd, BaseApplication): for handler in logging.root.handlers: handler.setLevel(level) - def complete_logging(self, text, line, begidx, endidx): - levels = ('debug', 'info', 'warning', 'error', 'quiet', 'default') - args = line.split(' ') - if len(args) == 2: - return levels - return () - def do_condition(self, line): """ condition [EXPRESSION | off] @@ -958,104 +701,44 @@ class ReplApplication(Cmd, BaseApplication): else: print ' '.join(self.selected_fields) + def complete_inspect(self, text, line, begidx, endidx): + return sorted(set(backend.name for backend in self.enabled_backends)) - # user interaction related methods - - def ask(self, question, default=None, masked=False, regexp=None, choices=None): + def do_inspect(self, line): """ - Ask a question to user. + inspect BACKEND_NAME - @param question text displayed (str) - @param default optional default value (str) - @param masked if True, do not show typed text (bool) - @param regexp text must match this regexp (str) - @param choices choices to do (list) - @return entered text by user (str) + Display the HTML string of the current page of the specified backend's browser. + + If webkit_mechanize_browser Python module is installed, HTML is displayed in a WebKit GUI. """ - - if isinstance(question, Value): - v = deepcopy(question) - if default: - v.default = default - if masked: - v.masked = masked - if regexp: - v.regexp = regexp - if choices: - v.choices = choices + if len(self.enabled_backends) == 1: + backend = list(self.enabled_backends)[0] else: - if isinstance(default, bool): - klass = ValueBool - elif isinstance(default, float): - klass = ValueFloat - elif isinstance(default, (int,long)): - klass = ValueInt - else: - klass = Value - - v = klass(label=question, default=default, masked=masked, regexp=regexp, choices=choices) - - question = v.label - if v.id: - question = u'[%s] %s' % (v.id, question) - - aliases = {} - if isinstance(v, ValueBool): - question = u'%s (%s/%s)' % (question, 'Y' if v.default else 'y', 'n' if v.default else 'N') - elif v.choices: - tiny = True - for key in v.choices.iterkeys(): - if len(key) > 5 or ' ' in key: - tiny = False - break - - if tiny: - question = u'%s (%s)' % (question, '/'.join((s.upper() if s == v.default else s) - for s in (v.choices.iterkeys()))) - else: - for n, (key, value) in enumerate(v.choices.iteritems()): - print '%s%2d)%s %s' % (self.BOLD, n + 1, self.NC, value) - aliases[str(n + 1)] = key - question = u'%s (choose in list)' % question - elif default not in (None, '') and not v.masked: - question = u'%s [%s]' % (question, v.default) - - if v.masked: - question = u'%s (hidden input)' % question - - question += ': ' - - while True: - if v.masked: - line = getpass.getpass(question) - else: - self.stdout.write(question) - self.stdout.flush() - line = self.stdin.readline() - if len(line) == 0: - raise EOFError() - else: - line = line.rstrip('\r\n') - - if not line and v.default is not None: - line = v.default - if isinstance(line, str): - line = line.decode('utf-8') - - if line in aliases: - line = aliases[line] - - try: - v.set_value(line) - except ValueError, e: - print 'Error: %s' % e - else: - break - - return v.value - - # formatting related methods + backend_name = line.strip() + if not backend_name: + print 'Please specify a backend name.' + return + backends = set(backend for backend in self.enabled_backends if backend.name == backend_name) + if not backends: + print 'No backend found for "%s"' % backend_name + return + backend = backends.pop() + if not hasattr(backend, '_browser'): + print 'No browser created for backend "%s" yet. Please invoke a command before.' % backend.name + return + browser = backend._browser + data = browser.parser.tostring(browser.page.document) + try: + from webkit_mechanize_browser.browser import Browser + from weboob.tools.inspect import Page + except ImportError: + print data + else: + page = Page(core=browser, data=data, uri=browser._response.geturl()) + browser = Browser(view=page.view) + # -- formatting related methods ------------- def set_formatter(self, name): """ Set the current formatter from name. @@ -1097,69 +780,3 @@ class ReplApplication(Cmd, BaseApplication): def flush(self): self.formatter.flush() - - def parse_id(self, _id, unique_backend=False): - try: - _id, backend_name = _id.rsplit('@', 1) - except ValueError: - backend_name = None - if unique_backend and not backend_name: - backends = [] - for name, backend in sorted(self.weboob.modules_loader.loaded.iteritems()): - if self.CAPS and not self.caps_included(backend.iter_caps(), self.CAPS.__name__): - continue - backends.append((name, backend)) - if self.interactive: - while not backend_name: - print 'This command works with a unique backend. Availables:' - for index, (name, backend) in enumerate(backends): - print '%s%d)%s %s%-15s%s %s' % (self.BOLD, index + 1, self.NC, self.BOLD, name, self.NC, - backend.description) - response = self.ask('Select a backend to proceed', regexp='^\d+$') - if response.isdigit(): - i = int(response) - 1 - if i < 0 or i >= len(backends): - print 'Error: %s is not a valid choice' % response - continue - backend_name = backends[i][0] - else: - raise BackendNotGiven('Please specify a backend to use for this argument (%s@backend_name). ' - 'Availables: %s.' % (_id, ', '.join(name for name, backend in backends))) - return _id, backend_name - - def do_inspect(self, line): - """ - inspect BACKEND_NAME - - Display the HTML string of the current page of the specified backend's browser. - - If webkit_mechanize_browser Python module is installed, HTML is displayed in a WebKit GUI. - """ - if len(self.enabled_backends) == 1: - backend = list(self.enabled_backends)[0] - else: - backend_name = line.strip() - if not backend_name: - print 'Please specify a backend name.' - return - backends = set(backend for backend in self.enabled_backends if backend.name == backend_name) - if not backends: - print 'No backend found for "%s"' % backend_name - return - backend = backends.pop() - if not hasattr(backend, '_browser'): - print 'No browser created for backend "%s" yet. Please invoke a command before.' % backend.name - return - browser = backend._browser - data = browser.parser.tostring(browser.page.document) - try: - from webkit_mechanize_browser.browser import Browser - from weboob.tools.inspect import Page - except ImportError: - print data - else: - page = Page(core=browser, data=data, uri=browser._response.geturl()) - browser = Browser(view=page.view) - - def complete_inspect(self, text, line, begidx, endidx): - return sorted(set(backend.name for backend in self.enabled_backends))