factorize ls and cd in ReplApplication

This commit is contained in:
Romain Bignon 2011-04-21 11:01:50 +02:00
commit c28d5a99c1
5 changed files with 133 additions and 222 deletions

View file

@ -27,9 +27,6 @@ from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.formatters.iformatter import IFormatter
from weboob.tools.misc import html2text
from weboob.capabilities.collection import Collection, ICapCollection, CollectionNotFound
from weboob.tools.path import Path
__all__ = ['Boobmsg']
@ -181,10 +178,6 @@ class Boobmsg(ReplApplication):
'export_all': 'msg',
'ls': 'msglist',
}
def __init__(self, *args, **kwargs):
ReplApplication.__init__(self, *args, **kwargs)
self.working_path = Path()
def add_application_options(self, group):
group.add_option('-e', '--skip-empty', action='store_true',
@ -365,74 +358,3 @@ class Boobmsg(ReplApplication):
print 'Oops, you need to be in interactive mode to read messages'
else:
print 'Message not found'
def do_ls(self, line):
#~ self.videos = []
path = self.working_path.get()
if len(path) == 0:
for name in [b.NAME for b in self.weboob.iter_backends(caps=ICapCollection)]:
print name
return 0
def do(backend):
return backend.iter_resources(path[1:])
for backend, rep in self.do(do, backends=path[0]):
if isinstance(rep, Thread):
#~ self.videos.append(rep)
self.format(rep)
else:
print rep
self.flush()
def do_cd(self, line):
line = line.encode('utf-8')
self.working_path.extend(line)
req_path = self.working_path.get()
if len(req_path) == 0:
self.prompt = '%s> ' % self.APPNAME
return 0
working_backend = req_path[0]
path = req_path[1:]
if working_backend in [b.NAME for b in self.enabled_backends]:
if working_backend in [b.NAME for b in self.weboob.iter_backends(caps=ICapCollection)]:
backend = [b for b in self.enabled_backends if b.NAME == working_backend][0]
else:
print >>sys.stderr, "Error backend %s not implement Collection" % working_backend
return 1
else:
print >>sys.stderr, "Error backend %s unknow" % working_backend
return 1
try:
path = backend.change_working_collection(path)
except NotImplementedError:
print >>sys.stderr, "Error backend %s not implement collection" % working_backend
self.working_path.restore()
return 1
except CollectionNotFound:
print >>sys.stderr, "Path: %s not found" % self.working_path.tostring()
self.working_path.restore()
return 1
self.prompt = '%s:%s> ' % (self.APPNAME, self.working_path.tostring() )
def complete_cd(self, text, line, begidx, endidx):
mline = line.partition(' ')[2]
offs = len(mline) - len(text)
path = self.working_path.get()
if len(path) == 0:
tmp = [b.NAME for b in self.weboob.iter_backends(caps=ICapCollection)]
else:
backend = [b for b in self.enabled_backends if b.NAME == path[0]][0]
tmp = [rep for rep in backend.iter_resources(path[1:])]
return [s[offs:] for s in tmp if s.startswith(mline)]

View file

@ -23,16 +23,12 @@ import subprocess
import sys
import os
from weboob.capabilities.video import ICapVideo, BaseVideo
from weboob.capabilities.video import ICapVideo
from weboob.capabilities.base import NotLoaded
from weboob.tools.application.repl import ReplApplication
from weboob.tools.application.media_player import InvalidMediaPlayer, MediaPlayer, MediaPlayerNotFound
from weboob.tools.application.formatters.iformatter import IFormatter
from weboob.capabilities.collection import Collection, ICapCollection, CollectionNotFound
from weboob.tools.path import Path
__all__ = ['Videoob']
@ -71,41 +67,19 @@ class Videoob(ReplApplication):
COMMANDS_FORMATTERS = {'search': 'video_list', 'ls': 'video_list'}
nsfw = True
videos = []
def __init__(self, *args, **kwargs):
ReplApplication.__init__(self, *args, **kwargs)
self.player = MediaPlayer(self.logger)
self.working_path = Path()
def main(self, argv):
self.load_config()
return ReplApplication.main(self, argv)
def _get_video(self, _id, fields=None):
if self.interactive:
try:
video = self.videos[int(_id) - 1]
except (IndexError,ValueError):
pass
else:
for backend, video in self.do('fillobj', video, fields, backends=[video.backend]):
if video:
return video
_id, backend_name = self.parse_id(_id)
backend_names = (backend_name,) if backend_name is not None else self.enabled_backends
for backend, video in self.do('get_video', _id, backends=backend_names):
if video:
return video
def _complete_id(self):
return ['%s@%s' % (video.id, video.backend) for video in self.videos]
def complete_download(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_id()
return self._complete_object()
elif len(args) >= 3:
return self.path_completer(args[2])
@ -116,11 +90,11 @@ class Videoob(ReplApplication):
Download a video
"""
_id, dest = self.parse_command_args(line, 2, 1)
video = self._get_video(_id, ['url'])
video = self.get_object(_id, 'get_video', ['url'])
if not video:
print 'Video not found: %s' % _id
return 1
def check_exec(executable):
with open('/dev/null', 'w') as devnull:
process = subprocess.Popen(['which', executable], stdout=devnull)
@ -128,14 +102,14 @@ class Videoob(ReplApplication):
print >>sys.stderr, 'Please install "%s"' % executable
return False
return True
if dest is None:
ext = video.ext
if not ext:
ext = 'avi'
dest = '%s.%s' % (video.id, ext)
if video.url.find('rtmp') == 0:
if check_exec('rtmpdump'):
cmd = "rtmpdump -r " + video.url + " -o " + dest
@ -146,13 +120,13 @@ class Videoob(ReplApplication):
cmd = 'wget "%s" -O "%s"' % (video.url, dest)
else:
return 1
os.system(cmd)
def complete_play(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_id()
return self._complete_object()
def do_play(self, _id):
"""
@ -164,7 +138,7 @@ class Videoob(ReplApplication):
print 'This command takes an argument: %s' % self.get_command_help('play', short=True)
return
video = self._get_video(_id, ['url'])
video = self.get_object(_id, 'get_video', ['url'])
if not video:
print 'Video not found: %s' % _id
return
@ -180,7 +154,7 @@ class Videoob(ReplApplication):
def complete_info(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_id()
return self._complete_object()
def do_info(self, _id):
"""
@ -192,7 +166,7 @@ class Videoob(ReplApplication):
print 'This command takes an argument: %s' % self.get_command_help('info', short=True)
return
video = self._get_video(_id)
video = self.get_object(_id, 'get_video')
if not video:
print >>sys.stderr, 'Video not found: %s' % _id
return
@ -237,80 +211,9 @@ class Videoob(ReplApplication):
return 1
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'Latest videos')
self.videos = []
self.change_path('/search')
for backend, video in self.do('iter_search_results', pattern=pattern, nsfw=self.nsfw,
max_results=self.options.count):
self.videos.append(video)
self.add_object(video)
self.format(video)
self.flush()
def do_ls(self, line):
self.videos = []
path = self.working_path.get()
if len(path) == 0:
for name in [b.NAME for b in self.weboob.iter_backends(caps=ICapCollection)]:
print name
return 0
def do(backend):
return backend.iter_resources(path[1:])
for backend, rep in self.do(do, backends=path[0]):
if isinstance(rep, BaseVideo):
self.videos.append(rep)
self.format(rep)
else:
print rep
self.flush()
def do_cd(self, line):
line = line.encode('utf-8')
self.working_path.extend(line)
req_path = self.working_path.get()
if len(req_path) == 0:
self.prompt = '%s> ' % self.APPNAME
return 0
working_backend = req_path[0]
path = req_path[1:]
if working_backend in [b.NAME for b in self.enabled_backends]:
if working_backend in [b.NAME for b in self.weboob.iter_backends(caps=ICapCollection)]:
backend = [b for b in self.enabled_backends if b.NAME == working_backend][0]
else:
print >>sys.stderr, "Error backend %s not implement Collection" % working_backend
return 1
else:
print >>sys.stderr, "Error backend %s unknow" % working_backend
return 1
try:
path = backend.change_working_collection(path)
except NotImplementedError:
print >>sys.stderr, "Error backend %s not implement collection" % working_backend
self.working_path.restore()
return 1
except CollectionNotFound:
print >>sys.stderr, "Path: %s not found" % self.working_path.tostring()
self.working_path.restore()
return 1
self.prompt = '%s:%s> ' % (self.APPNAME, self.working_path.tostring() )
def complete_cd(self, text, line, begidx, endidx):
mline = line.partition(' ')[2]
offs = len(mline) - len(text)
path = self.working_path.get()
if len(path) == 0:
tmp = [b.NAME for b in self.weboob.iter_backends(caps=ICapCollection)]
else:
backend = [b for b in self.enabled_backends if b.NAME == path[0]][0]
tmp = [rep for rep in backend.iter_resources(path[1:])]
return [s[offs:] for s in tmp if s.startswith(mline)]

View file

@ -92,25 +92,10 @@ class Weboorrents(ReplApplication):
'info': 'torrent_info',
}
torrents = []
def _complete_id(self):
return ['%s@%s' % (torrent.id, torrent.backend) for torrent in self.torrents]
def complete_info(self, text, line, *ignored):
args = line.split(' ')
if len(args) == 2:
return self._complete_id()
def parse_id(self, id):
if self.interactive:
try:
torrent = self.torrents[int(id) - 1]
except (IndexError,ValueError):
pass
else:
id = '%s@%s' % (torrent.id, torrent.backend)
return ReplApplication.parse_id(self, id)
return self._complete_object()
def do_info(self, id):
"""
@ -134,7 +119,7 @@ class Weboorrents(ReplApplication):
def complete_getfile(self, text, line, *ignored):
args = line.split(' ', 2)
if len(args) == 2:
return self._complete_id()
return self._complete_object()
elif len(args) >= 3:
return self.path_completer(args[2])
@ -171,11 +156,11 @@ class Weboorrents(ReplApplication):
Search torrents.
"""
self.torrents = []
self.change_path('/search')
if not pattern:
pattern = None
self.set_formatter_header(u'Search pattern: %s' % pattern if pattern else u'Latest torrents')
for backend, torrent in self.do('iter_torrents', pattern=pattern):
self.torrents.append(torrent)
self.add_object(torrent)
self.format(torrent)
self.flush()

View file

@ -25,12 +25,14 @@ from optparse import OptionGroup, OptionParser, IndentedHelpFormatter
import os
import sys
from weboob.capabilities.base import FieldNotFound
from weboob.capabilities.base import FieldNotFound, CapBaseObject
from weboob.core import CallErrors
from weboob.core.modules import ModuleLoadError
from weboob.tools.application.formatters.iformatter import MandatoryFieldsNotFound
from weboob.tools.misc import to_unicode
from weboob.tools.path import Path
from weboob.tools.ordereddict import OrderedDict
from weboob.capabilities.collection import Collection, ICapCollection, CollectionNotFound
from .console import BackendNotGiven, ConsoleApplication
from .formatters.load import FormattersLoader, FormatterLoadError
@ -128,11 +130,54 @@ class ReplApplication(Cmd, ConsoleApplication):
self._parser.add_option_group(formatting_options)
self._interactive = False
self.objects = []
self.working_path = Path()
@property
def interactive(self):
return self._interactive
def change_path(self, path):
if len(path) > 0 and path != '/':
self.prompt = '%s:%s> ' % (self.APPNAME, path)
else:
self.prompt = '%s> ' % (self.APPNAME)
self.objects = []
def add_object(self, obj):
self.objects.append(obj)
def _complete_object(self):
return ['%s@%s' % (obj.id, obj.backend) for obj in self.objects]
def parse_id(self, id):
if self.interactive:
try:
obj = self.objects[int(id) - 1]
except (IndexError,ValueError):
pass
else:
if isinstance(obj, CapBaseObject):
id = '%s@%s' % (obj.id, obj.backend)
return ConsoleApplication.parse_id(self, id)
def get_object(self, _id, method, fields=None):
if self.interactive:
try:
obj = self.objects[int(_id) - 1]
except (IndexError,ValueError):
pass
else:
if isinstance(obj, CapBaseObject):
for backend, obj in self.do('fillobj', obj, fields, backends=[obj.backend]):
if obj:
return obj
_id, backend_name = self.parse_id(_id)
backend_names = (backend_name,) if backend_name is not None else self.enabled_backends
for backend, obj in self.do(method, _id, backends=backend_names):
if obj:
return obj
def main(self, argv):
cmd_args = argv[1:]
if cmd_args:
@ -741,6 +786,62 @@ class ReplApplication(Cmd, ConsoleApplication):
page = Page(core=browser, data=data, uri=browser._response.geturl())
browser = Browser(view=page.view)
def do_ls(self, line):
if len(self.objects) == 0:
self.objects = self._fetch_objects()
for obj in self.objects:
if isinstance(obj, CapBaseObject):
self.format(obj)
else:
print obj.title
self.flush()
def do_cd(self, line):
line = line.encode('utf-8')
self.working_path.extend(line)
objects = self._fetch_objects()
if len(objects) == 0:
print >>sys.stderr, "Path: %s not found" % self.working_path.tostring()
self.working_path.restore()
return 1
self.objects = objects
self.change_path(self.working_path.tostring())
def _fetch_objects(self):
objects = []
path = self.working_path.get()
try:
for backend, res in self.do('iter_resources', path, caps=ICapCollection):
objects.append(res)
except CallErrors, errors:
for backend, error, backtrace in errors.errors:
if isinstance(error, CollectionNotFound):
pass
else:
self.bcall_error_handler(backend, error, backtrace)
return objects
def complete_cd(self, text, line, begidx, endidx):
directories = ['..']
mline = line.partition(' ')[2]
offs = len(mline) - len(text)
if len(self.objects) == 0:
self.objects = self._fetch_objects()
for obj in self.objects:
if isinstance(obj, Collection):
directories.append(obj.title)
return [s[offs:] for s in directories if s.startswith(mline)]
# -- formatting related methods -------------
def set_formatter(self, name):
"""

View file

@ -24,37 +24,37 @@ class Path(object):
def __init__(self):
self._working_path = []
self._previous = self._working_path
def extend(self, user_input):
user_input = urllib.quote_plus(user_input)
user_input = posixpath.normpath(user_input)
escape = lambda s: s.replace('/', '%2F')
current_path = map(escape, self._working_path)
abspath = posixpath.normpath(posixpath.join('/' + '/'.join(current_path), user_input))
abspath = abspath.split('/')[1:]
while len(abspath) > 0 and abspath[0] == u'': del abspath[0]
final_parse = map(urllib.unquote_plus, abspath)
self._previous = self._working_path
if len(final_parse) == 0:
self._working_path = []
self._working_path = final_parse
def restore(self):
self._working_path = self._previous
def get(self):
return copy.copy(self._working_path)
def tostring(self):
escape = lambda s: s.replace('/', '\/')
path = map(escape, self._working_path)