new ReplApplication base class

This commit is contained in:
Christophe Benz 2010-08-18 20:30:51 +02:00 committed by Romain Bignon
commit ab4a427586
8 changed files with 978 additions and 41 deletions

25
scripts/videoob-repl Executable file
View file

@ -0,0 +1,25 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim: ft=python et softtabstop=4 cinoptions=4 shiftwidth=4 ts=4 ai
# Copyright(C) 2010 Romain Bignon
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from weboob.applications.videoobrepl import VideoobRepl
if __name__ == '__main__':
VideoobRepl.run()

View file

@ -23,23 +23,24 @@ import logging
import weboob
from weboob.capabilities.bank import ICapBank
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['Boobank']
class Boobank(ConsoleApplication):
class Boobank(ReplApplication):
APPNAME = 'boobank'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon'
COPYRIGHT = 'Copyright(C) 2010 Romain Bignon, Christophe Benz'
def main(self, argv):
return self.process_command(*argv[1:])
@ConsoleApplication.command('List every available accounts')
def command_list(self):
def load_default_backends(self):
self.load_backends(ICapBank)
def do_list(self, line):
"""
List every available accounts.
"""
tot_balance = 0.0
tot_coming = 0.0
try:
@ -54,13 +55,14 @@ class Boobank(ConsoleApplication):
else:
logging.error(u'Error[%s]: %s\n%s' % (backend.name, error, backtrace))
else:
self.format((('id', ''),
('label', 'Total'),
self.format((('label', 'Total'),
('balance', tot_balance),
('coming', tot_coming)))
@ConsoleApplication.command('Display old operations')
def command_history(self, id):
def do_history(self, id):
"""
Display old operations.
"""
id, backend_name = self.parse_id(id)
names = (backend_name,) if backend_name is not None else None
self.load_backends(ICapBank, names=names)
@ -72,8 +74,10 @@ class Boobank(ConsoleApplication):
for backend, operation in self.do(do):
self.format(operation)
@ConsoleApplication.command('Display all future operations')
def command_coming(self, id):
def do_coming(self, id):
"""
Display all future operations.
"""
id, backend_name = self.parse_id(id)
names = (backend_name,) if backend_name is not None else None
self.load_backends(ICapBank, names=names)

View file

@ -17,39 +17,54 @@
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import logging
from weboob.capabilities.video import ICapVideo
from weboob.tools.application.console import ConsoleApplication
from weboob.tools.application.repl import ReplApplication
__all__ = ['Videoob']
class Videoob(ConsoleApplication):
class Videoob(ReplApplication):
APPNAME = 'videoob'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 Christophe Benz, Romain Bignon'
COPYRIGHT = 'Copyright(C) 2010 Christophe Benz, Romain Bignon, John Obbele'
def load_default_backends(self):
self.load_backends(caps=ICapVideo)
def add_application_options(self, group):
group.add_option('--nsfw', action='store_true', help='enable non-suitable for work videos')
def main(self, argv):
def handle_application_options(self):
if self.options.backends:
self.options.nsfw = True
return self.process_command(*argv[1:])
@ConsoleApplication.command('Get information about a video (accepts ID or URL)')
def command_info(self, _id):
def do_info(self, _id):
"""
info ID
Get information about a video.
"""
if not _id:
logging.error(u'This command takes an argument: %s' % self.get_command_help('info', short=True))
return
_id, backend_name = self.parse_id(_id)
names = (backend_name,) if backend_name is not None else None
self.load_backends(ICapVideo, names=names)
for backend, video in self.do('get_video', _id):
if video is None:
continue
self.format(video)
def do_search(self, pattern=None):
"""
search [PATTERN]
@ConsoleApplication.command('Search for videos')
def command_search(self, pattern=None):
self.load_backends(ICapVideo)
Search for videos matching a PATTERN.
If PATTERN is not given, this command will search for the latest videos.
"""
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'Latest videos')
for backend, video in self.do('iter_search_results', pattern=pattern, nsfw=self.options.nsfw,
max_results=self.options.count):

View file

@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010 John Obbele, based on previous work from solsTiCe
# d'Hiver # <solstice.dhiver@gmail.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from .videoobrepl import VideoobRepl
__all__ = ['VideoobRepl']

View file

@ -0,0 +1,386 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010 John Obbele
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import os
import sys
import errno
from cmd import Cmd
from subprocess import Popen, PIPE
from weboob.capabilities.video import ICapVideo
from weboob.tools.application.console import ConsoleApplication
__all__ = ['VideoobRepl']
# EVIL GLOBAL VARIABLES {{{
# shell escape strings
BOLD = ''
NC = '' # no color
# A list of tuples: (player , play_from_stdin_cmd)
# FIXME: lookup preference in freedesktop MIME database
PLAYERS = [
('parole', 'parole fd://0'),
('totem', 'totem fd://0'),
('mplayer', 'mplayer -really-quiet -'),
('vlc', 'vlc -'),
('xine', 'xine stdin:/'),
]
# }}}
class DefaultOptions():
"""Dummy options object.
Should be replaced by a proper one from optparse.
"""
def __init__(self):
self.lang = "fr"
self.quality = "hd"
self.verbose = True
class MyPlayer():
"""Black magic invoking a video player to this world.
Presently, due to strong disturbances in the holidays of the ether
world, the video player used is chosen from a static list of
programs. See PLAYERS for more information.
You MAY want to move it into a separate weboob.tools.applications
module.
"""
def __init__(self, options=DefaultOptions()):
"@param options [object] requires the bool. attribute 'verbose'"
self.options = options
self.player = None
for (binary,cmd_stdin) in PLAYERS:
if self._find_in_path(os.environ['PATH'], binary):
self.player = binary
self.player_stdin = cmd_stdin
break
if not self.player:
raise OSError(errno.ENOENT, "video player not found")
if self.options.verbose:
print "Video player is (%s,%s)" % (self.player,
self.player_stdin)
def play(self, video):
"""Play a video object, using programs from the PLAYERS list.
This function dispatch calls to either _play_default or
_play_rtmp for special rtmp streams using SWF verification.
"""
if video.url.find('rtmp') == 0:
self._play_rtmp(video)
else:
self._play_default(video)
def _play_default(self, video):
"Play video.url with the video player."
cmd = self.player + " " + video.url
args = cmd.split()
print "invoking [%s]" % cmd
os.spawnlp(os.P_NOWAIT, args[0], *args)
def _play_rtmp(self, video):
""""Download data with rtmpdump and pipe them to a video player.
You need a working version of rtmpdump installed and the SWF
object url in order to comply with SWF verification requests
from the server. The last one is retrieved from the non-standard
non-API compliant 'swf_player' attribute of the 'video' object.
"""
if not self._find_in_path(os.environ['PATH'], 'rtmpdump'):
raise OSError(errno.ENOENT, "\'rtmpdump\' binary not found")
video_url = video.url
try:
player_url = video.swf_player
rtmp = 'rtmpdump -r %s --swfVfy %s' % (video_url, player_url)
except AttributeError:
error("Your video object does not have a 'swf_player' "
"attribute. SWF verification will be disabled and "
"may prevent correct video playback.")
rtmp = 'rtmpdump -r %s' % video_url
if not self.options.verbose:
rtmp += ' --quiet'
print ':: Streaming from %s' % video_url
print ':: to %s' % self.player_stdin
p1 = Popen(rtmp.split(), stdout=PIPE)
p2 = Popen(self.player_stdin.split(),
stdin=p1.stdout, stderr=PIPE)
def _find_in_path(self,path, filename):
for i in path.split(':'):
if os.path.exists('/'.join([i, filename])):
return True
return False
def error(str):
"Shortcut to print >>sys.stderr"
print >> sys.stderr, "Error:", str
def print_keys_values(tuplets, indent=0, highlight=False):
"""Pretty print a list of (key, values) tuplets."""
first_column_width = max(len(k) for (k,v) in tuplets)
for (key,value) in tuplets:
# calm down typography nitpickers
key = key + ":"
# assert first column width
key = key + " " * (first_column_width - len(key) + 1)
# add uniform indentation if needed
key = " " * indent + key
# call 911
if highlight:
key = BOLD + key + NC
print key, value
class MyCmd(Cmd):
"""Read-Eval-Print-Loop object build from the 'cmd' framework.
It's just a command dispatcher, so get done with it.
"""
def __init__(self, consoleApplication, player):
"""
@param consoleApplication an instance of ConsoleApplication
@param player an instance of MyPlayer()
"""
Cmd.__init__(self)
self.prompt = BOLD + 'weboob> ' + NC
self.intro = 'Type "help" to see available commands.'
self.player = player
# engine / console application initialisation
# (loading ALL backends by default)
self.engine = consoleApplication
self.enabled_backends = []
self.available_backends = []
self.engine.load_backends(ICapVideo)
for b in self.engine.weboob.iter_backends(caps=ICapVideo):
self.enabled_backends.append(b)
self.available_backends.append(b)
self.videos = [] # videos list cache
def do_quit(self, arg):
"""quit the command line interpreter"""
print "Byebye !"
return True
def do_exit(self, arg):
"""quit the command line interpreter"""
return self.do_quit(arg)
def do_EOF(self, arg):
"""quit the command line interpreter when ^D is pressed"""
print ""
return self.do_quit(arg)
# By default, an emptyline repeats the previous command.
# Overriding this function disables the behaviour.
def emptyline(self):
pass
# Called when command prefix is not recognized
def default(self, line):
error('don\'t know how to %s' % line)
# uncomment the leading '_' to use it as a debug function
def _completedefault(self, text, line, begidx, endidx):
error('don\'t know how to complete '
'(text, line, begidx, endidx) =\n'
'(%s,%s,%d,%d)' %
(text, line, begidx, endidx))
def _completion_helper(self, text, choices):
"""Complete TEST with string from CHOICES."""
if text:
return [x for x in choices if x.startswith(text)]
else:
return choices
# The global help option can be ignored as long as you are to lazy
# to implement it yourself.
#def do_help(self, arg): pass
### dedicated commands
### fun starts here
###
# TODO: do_status
# TODO: toggle_nsfw
# TODO: retrieve video from page_url
def do_backends(self, line):
"""backends ACTION [backend0 backend1] …
ACTION is one of the following:
- add: enable backends
- rm | remove: disable backends
- only: enable only the following backends
- view: list enabled and available backends
if no arguments are given, default to 'view'
"""
if not line:
args = ["view"] # default behaviour
else:
args = line.split()
if args[0] in ["add", "only", "rm", "remove"]:
if args[0] == "add":
action = self.enabled_backends.append
elif args[0] == "only":
self.enabled_backends = [] # reset
action = self.enabled_backends.append
elif args[0] == "remove" or args[0] == "rm":
action = self.enabled_backends.remove
else:
return False
for b in self.available_backends:
if b.name in args[1:]:
action(b)
self.enabled_backends.sort()
# FIXME: do we really need it ?
# reload engine
self.engine.deinit()
names = tuple(x.name for x in self.enabled_backends)
self.engine.load_backends(ICapVideo, names=names)
else: # else "view"
availables = " ".join(
x.name for x in self.available_backends)
enabled = " ".join(
x.name for x in self.enabled_backends)
print_keys_values([("Available backends", str(availables)),
("Enabled backends ", str(enabled))],
highlight=True)
def do_search(self, pattern):
"""search [PATTERN]
Search for videos.
If no patterns are given, display the last entries.
"""
if pattern:
format = u'Search pattern: %s' % pattern
else:
format = u'Latest videos'
# create generator, retrieve videos and add them to self.videos
videos_g = self.engine.do('iter_search_results',
pattern=pattern, nsfw=True,
max_results=10)
self.videos = [] # reset
for i, (backend, video) in enumerate(videos_g):
self.videos.append((backend,video))
# code factorisatorminator: display the list of videos
self.do_ls("")
def complete_backends(self, text, line, begidx, endidx):
choices = None
if line.count(' ') == 1:
choices = ["add", "remove", "view", "only"]
else:
choices = [x.name for x in self.available_backends]
if choices:
return self._completion_helper(text, choices)
def do_ls(self, line):
"""ls
Re-display the last list of videos.
"""
for i, (backend, video) in enumerate(self.videos):
print "%s(%d) %s %s(%s)" % (BOLD, i, video.title, NC,
backend.name)
print_keys_values([
("url", video.url),
("duration", "%s seconds" % video.duration),
("rating", "%.2f/%.2f" % (video.rating,
video.rating_max))],
indent=4)
def do_play(self, line):
"""play NUMBER
Play a previously listed video.
"""
try:
id = int(line)
except ValueError:
error("invalid number")
return False
try:
(backend, video) = self.videos[id]
id = video.id
except IndexError:
error("unknown video number")
return False
# FIXME: do we really have to unload/reload backends ?
self.engine.deinit()
names = (backend.name,) if backend is not None else None
self.engine.load_backends(ICapVideo, names=names)
# XXX: copy&paste from weboob-cli,
# don't ask me anything about it :(
for backend, video in self.engine.do('get_video', id):
if video is None:
continue
self.player.play(video)
class VideoobRepl(ConsoleApplication):
APPNAME = 'videoob-repl'
VERSION = '0.1'
COPYRIGHT = 'Copyright(C) 2010 John Obbele'
def add_application_options(self, group):
group.add_option('-C', '--configured',
action='store_true',
help='load configured backends')
def main(self, argv):
player = MyPlayer(DefaultOptions())
console = self
MyCmd(console, player).cmdloop()

View file

@ -146,12 +146,12 @@ class Weboob(object):
"""
backends = self.backend_instances.values()
_backends = kwargs.pop('backends', None)
if _backends:
if _backends is not None:
if isinstance(_backends, BaseBackend):
backends = [_backends]
elif isinstance(_backends, basestring) and _backends:
backends = [self.backend_instances[_backends]]
elif isinstance(_backends, (list,tuple)):
elif isinstance(_backends, (list, tuple, set)):
backends = []
for backend in _backends:
if isinstance(backend, basestring):

View file

@ -97,9 +97,24 @@ class BaseApplication(object):
"""
return set()
def _handle_app_options(self):
def _handle_options(self):
"""
Overload this method in subclasses if you want to handle options defined in subclass constructor.
Overload this method in application type subclass
if you want to handle options defined in subclass constructor.
"""
pass
def add_application_options(self, group):
"""
Overload this method if your application needs extra options.
These options will be displayed in an option group.
"""
pass
def handle_application_options(self):
"""
Overload this method in your application if you want to handle options defined in add_application_options.
"""
pass
@ -173,12 +188,14 @@ class BaseApplication(object):
self.config.load(self.CONFIG)
def main(self, argv):
""" Main function """
"""
Main method
Called by run
"""
raise NotImplementedError()
def load_backends(self, caps=None, names=None, *args, **kwargs):
if names is None:
names = self.requested_backends
loaded = self.weboob.load_backends(caps, names, *args, **kwargs)
if not loaded:
logging.warning(u'No backend loaded')
@ -195,13 +212,16 @@ class BaseApplication(object):
def _complete_obj(self, backend, fields, obj):
if fields:
if '*' in fields:
if fields == 'direct':
fields = None
elif fields == 'full':
fields = [k for k, v in iter_fields(obj)]
try:
print fields
backend.fillobj(obj, fields)
except ObjectNotAvailable, e:
logging.warning(u'Could not retrieve required fields (%s): %s' % (','.join(fields), e))
for field in set(fields) - set('*'):
for field in fields:
if getattr(obj, field) is NotLoaded:
setattr(obj, field, NotAvailable)
return obj
@ -213,7 +233,7 @@ class BaseApplication(object):
sub = self._complete_obj(backend, fields, sub)
yield sub
def complete(self, backend, count, selected_fields, function, *args, **kwargs):
def _complete(self, backend, count, selected_fields, function, *args, **kwargs):
assert count is None or count > 0
if callable(function):
res = function(backend, *args, **kwargs)
@ -235,10 +255,10 @@ class BaseApplication(object):
"""
This static method can be called to run the application.
It creates the application object, handle options, setup logging, run
the main() method, and catch common exceptions.
It creates the application object, handles options, setups logging, calls
the main() method, and catches common exceptions.
You can't do anything after this call, as it *always* finish with
You can't do anything after this call, as it *always* finishes with
a call to sys.exit().
For example:
@ -251,6 +271,8 @@ class BaseApplication(object):
app = klass()
app.options, args = app._parser.parse_args(args)
app.set_requested_backends(app.options.backends.split(',') if app.options.backends else None)
if app.options.shell_completion:
items = set()
for option in app._parser.option_list:
@ -261,7 +283,7 @@ class BaseApplication(object):
sys.exit(0)
if app.options.debug:
level=logging.DEBUG
level = logging.DEBUG
elif app.options.verbose:
level = logging.INFO
elif app.options.quiet:
@ -270,9 +292,9 @@ class BaseApplication(object):
level = logging.WARNING
log_format = '%(asctime)s:%(levelname)s:%(pathname)s:%(lineno)d:%(funcName)s %(message)s'
logging.basicConfig(stream=sys.stdout, level=level, format=log_format)
app.requested_backends = app.options.backends.split(',') if app.options.backends else None
app._handle_app_options()
app._handle_options()
app.handle_application_options()
try:
try:
@ -285,3 +307,6 @@ class BaseApplication(object):
sys.exit(1)
finally:
app.deinit()
def set_requested_backends(self, requested_backends):
pass

View file

@ -0,0 +1,460 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010 Christophe Benz
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
from __future__ import with_statement
import atexit
import cmd
from cStringIO import StringIO
from functools import partial
import getpass
from inspect import getargspec
import logging
from optparse import OptionGroup, OptionParser
import os
import re
import subprocess
import sys
from weboob.core import CallErrors
from weboob.core.backendscfg import BackendsConfig
from .base import BackendNotFound, BaseApplication
from .formatters.load import formatters, load_formatter
from .formatters.iformatter import FieldNotFound
from .results import ResultsCondition, ResultsConditionException
__all__ = ['ReplApplication']
class ReplApplication(cmd.Cmd, BaseApplication):
"""
Base application class for CLI applications.
"""
SYNOPSIS = 'Usage: %prog [-dqv] [-b backends] [-cnfs] [command [arguments..]]\n'
SYNOPSIS += ' %prog [--help] [--version]'
def __init__(self):
cmd.Cmd.__init__(self)
self.prompt = '%s> ' % self.APPNAME
self.intro = '\n'.join(('Welcome to %s v%s' % (self.APPNAME, self.VERSION),
'',
'%s' % self.COPYRIGHT,
'This program is free software; you can redistribute it and/or modify',
'it under the terms of the GNU General Public License as published by',
'the Free Software Foundation, version 3 of the License.',
'',
'Type "help" to display available commands.',
'',
))
self.weboob_commands = set(['backends', 'count', 'quit', 'select'])
self.hidden_commands = set(['EOF'])
option_parser = OptionParser(self.SYNOPSIS, version=self._get_optparse_version())
app_options = OptionGroup(option_parser, '%s Options' % self.APPNAME.capitalize())
self.add_application_options(app_options)
option_parser.add_option_group(app_options)
try:
BaseApplication.__init__(self, option_parser=option_parser)
except BackendsConfig.WrongPermissions, e:
logging.error(u'Error: %s' % e)
sys.exit(1)
self._parser.format_description = lambda x: self._parser.description
if self._parser.description is None:
self._parser.description = ''
help_str = self.do_help(return_only=True)
self._parser.description += help_str
results_options = OptionGroup(self._parser, 'Results Options')
results_options.add_option('-c', '--condition', help='filter result items to display given a boolean condition')
results_options.add_option('-n', '--count', default='10', type='int',
help='get a maximum number of results (all backends merged)')
results_options.add_option('-s', '--select', help='select result item keys to display (comma separated)')
self._parser.add_option_group(results_options)
formatting_options = OptionGroup(self._parser, 'Formatting Options')
formatting_options.add_option('-f', '--formatter', choices=formatters,
help='select output formatter (%s)' % u','.join(formatters))
formatting_options.add_option('--no-header', dest='no_header', action='store_true', help='do not display header')
formatting_options.add_option('--no-keys', dest='no_keys', action='store_true', help='do not display item keys')
self._parser.add_option_group(formatting_options)
try:
import readline
except ImportError:
pass
finally:
history_filepath = os.path.join(self.weboob.WORKDIR, '%s_history' % self.APPNAME)
try:
readline.read_history_file(history_filepath)
except IOError:
pass
def savehist():
readline.write_history_file(history_filepath)
atexit.register(savehist)
def set_requested_backends(self, requested_backends):
self.load_default_backends()
if requested_backends:
self.enabled_backends = set(backend for backend in self.weboob.iter_backends()
if backend.name in requested_backends)
else:
self.enabled_backends = list(self.weboob.iter_backends())
def load_default_backends(self):
"""
By default loads all backends.
Applications can overload this method to restrict backends loaded.
"""
self.load_backends()
@classmethod
def run(klass, args=None):
try:
super(ReplApplication, klass).run(args)
except BackendNotFound, e:
logging.error(e)
def main(self, argv):
cmd_args = argv[1:]
if cmd_args:
if cmd_args[0] == 'help':
self._parser.print_help()
self._parser.exit()
cmd_line = ' '.join(cmd_args)
cmds = cmd_line.split(';')
for cmd in cmds:
self.onecmd(cmd)
else:
self.cmdloop()
def do(self, function, *args, **kwargs):
"""
Call Weboob.do(), passing count and selected fields given by user.
"""
if kwargs.pop('backends', None) is None:
kwargs['backends'] = self.enabled_backends
return self.weboob.do(self._complete, self.options.count, self.selected_fields, function, *args, **kwargs)
# options related methods
def _handle_options(self):
if self.options.formatter:
formatter_name = self.options.formatter
else:
formatter_name = 'multiline'
self.formatter = load_formatter(formatter_name)
if self.options.no_header:
self.formatter.display_header = False
if self.options.no_keys:
self.formatter.display_keys = False
if self.options.select:
self.selected_fields = self.options.select.split(',')
else:
self.selected_fields = 'direct'
if self.options.condition:
self.condition = ResultsCondition(self.options.condition)
else:
self.condition = None
if self.options.count == 0:
self._parser.error('Count must be at least 1, or negative for infinite')
elif self.options.count < 0:
# infinite search
self.options.count = None
# default REPL commands
def do_quit(self, arg):
"""
Quit the application.
"""
return True
def do_EOF(self, arg):
"""
Quit the command line interpreter when ^D is pressed.
"""
# print empty line for the next shell prompt to appear on the first column of the terminal
print
return self.do_quit(arg)
def get_command_help(self, command, short=False):
try:
doc = getattr(self, 'do_' + command).__doc__
except AttributeError:
return None
if doc:
doc = '\n'.join(line.strip() for line in doc.strip().split('\n'))
if short:
doc = doc.split('\n')[0]
if not doc.startswith(command):
doc = command
return doc
def do_help(self, arg=None, return_only=False):
if return_only:
stringio = StringIO()
old_stdout = self.stdout
self.stdout = stringio
if arg:
cmd_names = set(name[3:] for name in self.get_names() if name.startswith('do_'))
if arg in cmd_names:
command_help = self.get_command_help(arg)
if command_help is None:
logging.warning(u'Command "%s" is undocumented' % arg)
else:
self.stdout.write('%s\n' % command_help)
else:
logging.error(u'Unknown command: "%s"' % arg)
else:
names = set(name for name in self.get_names() if name.startswith('do_'))
application_cmds_doc = []
weboob_cmds_doc = []
cmds_undoc = []
for name in sorted(names):
cmd = name[3:]
if cmd in self.hidden_commands.union(['help']):
continue
elif getattr(self, name).__doc__:
short_help = ' %s' % self.get_command_help(cmd, short=True)
if cmd in self.weboob_commands:
weboob_cmds_doc.append(short_help)
else:
application_cmds_doc.append(short_help)
else:
cmds_undoc.append(cmd)
application_cmds_header = '%s commands' % self.APPNAME.capitalize()
self.stdout.write('%s\n%s\n' % (application_cmds_header, '-' * len(application_cmds_header)))
self.stdout.write('\n'.join(application_cmds_doc) + '\n\n')
weboob_cmds_header = 'Generic Weboob commands'
self.stdout.write('%s\n%s\n' % (weboob_cmds_header, '-' * len(weboob_cmds_header)))
self.stdout.write('\n'.join(weboob_cmds_doc) + '\n\n')
self.print_topics(self.undoc_header, cmds_undoc, 15,80)
self.stdout.write('Type "help <command>" for more info about a command.\n')
if return_only:
self.stdout = old_stdout
return stringio.getvalue()
def emptyline(self):
"""
By default, an emptyline repeats the previous command.
Overriding this function disables this behaviour.
"""
pass
def default(self, line):
logging.error(u'Unknown command: "%s"' % line)
def completenames(self, text, *ignored):
return ['%s ' % name for name in cmd.Cmd.completenames(self, text, *ignored) if name not in self.hidden_commands]
def completion_helper(self, text, choices):
if text:
return [x for x in choices if x.startswith(text)]
else:
return choices
def do_backends(self, line):
"""
backends [ACTION] [BACKEND_NAME]...
Select used backends.
ACTION is one of the following (default: list):
* enable enable given backends
* disable disable given backends
* only enable given backends and disable the others
* list display enabled and available backends
"""
if not line:
args = ['list']
else:
args = line.split()
action = args[0]
given_backend_names = args[1:]
if action in ('enable', 'disable', 'only'):
if not given_backend_names:
logging.error(u'Please give at least a backend name.')
given_backends = set(backend for backend in self.weboob.iter_backends() if backend.name in given_backend_names)
if action == 'enable':
action_func = self.enabled_backends.add
for backend in given_backends:
try:
action_func(backend)
except KeyError, e:
logging.error(e)
elif action == 'disable':
action_func = self.enabled_backends.remove
for backend in given_backends:
try:
action_func(backend)
except KeyError, e:
logging.info('%s is not enabled' % e)
elif action == 'only':
self.enabled_backends = set()
action_func = self.enabled_backends.add
for backend in given_backends:
try:
action_func(backend)
except KeyError, e:
logging.error(e)
elif action == 'list':
print 'Available: %s' % ', '.join(sorted(backend.name for backend in self.weboob.iter_backends()))
print 'Enabled: %s' % ', '.join(sorted(backend.name for backend in self.enabled_backends))
else:
logging.error(u'Unknown action: "%s"' % action)
return False
def complete_backends(self, text, line, begidx, endidx):
choices = None
commands = ['enable', 'disable', 'only', 'list']
available_backends_names = set(backend.name for backend in self.weboob.iter_backends())
enabled_backends_names = set(backend.name for backend in self.enabled_backends)
args = line.split()
if len(args) == 1 or len(args) == 2 and args[1] not in commands:
choices = commands
elif len(args) == 2 and args[1] in commands or \
len(args) == 3 and args[1] in ('enable', 'only') and args[2] not in available_backends_names or \
len(args) == 3 and args[1] == 'disable' and args[2] not in enabled_backends_names:
if args[1] in ('enable', 'only'):
choices = sorted(available_backends_names - enabled_backends_names)
elif args[1] == 'disable':
choices = sorted(enabled_backends_names)
if choices is not None:
return ['%s ' % choice for choice in choices if choice.startswith(text)] if text else choices
def do_count(self, line):
"""
count [NUMBER]
If an argument is given, set the maximum number of results fetched.
Otherwise, display the current setting.
Count must be at least 1, or negative for infinite.
"""
if line:
try:
self.options.count = int(line)
except ValueError, e:
print e
if self.options.count == 0:
print 'count must be at least 1, or negative for infinite'
elif self.options.count < 0:
self.options.count = None
else:
print self.options.count
def do_select(self, line):
"""
select [FIELD_NAME]... | "direct" | "full"
"""
print self.selected_fields
# user interaction related methods
def ask(self, question, default=None, masked=False, regexp=None, choices=None):
"""
Ask a question to user.
@param question text displayed (str)
@param default optional default value (str)
@param masked if True, do not show typed text (bool)
@param regexp text must match this regexp (str)
@return entered text by user (str)
"""
is_bool = False
if choices:
question = u'%s (%s)' % (question, '/'.join(
[s for s in (choices.iterkeys() if isinstance(choices, dict) else choices)]))
if default is not None:
if isinstance(default, bool):
question = u'%s (%s/%s)' % (question, 'Y' if default else 'y', 'n' if default else 'N')
choices = ('y', 'n', 'Y', 'N')
default = 'y' if default else 'n'
is_bool = True
else:
question = u'%s [%s]' % (question, default)
if masked:
question = u'(input chars are hidden) %s' % question
question += ': '
correct = False
while not correct:
line = getpass.getpass(question) if masked else raw_input(question)
if not line and default is not None:
line = default
correct = (not regexp or re.match(regexp, unicode(line))) and \
(not choices or unicode(line) in
[unicode(s) for s in (choices.iterkeys() if isinstance(choices, dict) else choices)])
if is_bool:
return line.lower() == 'y'
else:
return line
# formatting related methods
def set_default_formatter(self, name):
if not self.options.formatter:
try:
self.formatter = load_formatter(name)
except ImportError:
default_name = 'multiline'
logging.error('Could not load default formatter "%s" for this command. Falling back to "%s".' % (
name, default_name))
self.formatter = load_formatter(default_name)
def set_formatter_header(self, string):
self.formatter.set_header(string)
def format(self, result):
try:
self.formatter.format(obj=result, selected_fields=self.selected_fields, condition=self.condition)
except FieldNotFound, e:
logging.error(e)
except ResultsConditionException, e:
logging.error(e)
def parse_id(self, _id):
try:
_id, backend_name = _id.rsplit('@', 1)
except ValueError:
backend_name = None
return _id, backend_name