weboob-devel/modules/youtube/backend.py
2013-04-20 17:00:48 +02:00

184 lines
6.5 KiB
Python

# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Christophe Benz, Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import with_statement
try:
import gdata.youtube.service
except ImportError:
raise ImportError("Please install python-gdata")
import datetime
import re
import urllib
from weboob.capabilities.base import NotAvailable
from weboob.capabilities.video import ICapVideo, BaseVideo
from weboob.capabilities.collection import ICapCollection, CollectionNotFound
from weboob.tools.capabilities.thumbnail import Thumbnail
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.tools.misc import to_unicode
from weboob.tools.value import ValueBackendPassword, Value
from .browser import YoutubeBrowser
from .video import YoutubeVideo
__all__ = ['YoutubeBackend']
class YoutubeBackend(BaseBackend, ICapVideo, ICapCollection):
NAME = 'youtube'
MAINTAINER = u'Christophe Benz'
EMAIL = 'christophe.benz@gmail.com'
VERSION = '0.g'
DESCRIPTION = 'YouTube video streaming website'
LICENSE = 'AGPLv3+'
BROWSER = YoutubeBrowser
CONFIG = BackendConfig(Value('username', label='Email address', default=''),
ValueBackendPassword('password', label='Password', default=''))
URL_RE = re.compile(r'^https?://(?:\w*\.?youtube(?:|-nocookie)\.com/(?:watch\?v=|v/)|youtu\.be\/|\w*\.?youtube\.com\/user\/\w+#p\/u\/\d+\/)([^\?&]+)')
def create_default_browser(self):
password = None
username = self.config['username'].get()
if len(username) > 0:
password = self.config['password'].get()
return self.create_browser(username, password)
def _entry2video(self, entry):
"""
Parse an entry returned by gdata and return a Video object.
"""
video = YoutubeVideo(to_unicode(entry.id.text.split('/')[-1].strip()))
video.title = to_unicode(entry.media.title.text.strip())
video.duration = datetime.timedelta(seconds=int(entry.media.duration.seconds.strip()))
video.thumbnail = Thumbnail(to_unicode(entry.media.thumbnail[0].url.strip()))
if entry.author[0].name.text:
video.author = to_unicode(entry.author[0].name.text.strip())
if entry.media.name:
video.author = to_unicode(entry.media.name.text.strip())
return video
def _set_video_url(self, video):
"""
In the case of a download, if the user-chosen format is not
available, the next available format will be used.
Much of the code for this method is borrowed from youtubeservice.py of Cutetube
http://maemo.org/packages/view/cutetube/.
"""
if video.url:
return
player_url = YoutubeVideo.id2url(video.id)
with self.browser:
url, ext = self.browser.get_video_url(player_url)
video.url = unicode(url)
video.ext = unicode(ext)
def get_video(self, _id):
m = self.URL_RE.match(_id)
if m:
_id = m.group(1)
yt_service = gdata.youtube.service.YouTubeService()
yt_service.ssl = True
try:
entry = yt_service.GetYouTubeVideoEntry(video_id=_id)
except gdata.service.Error, e:
if e.args[0]['status'] == 400:
return None
raise
video = self._entry2video(entry)
self._set_video_url(video)
video.set_empty_fields(NotAvailable)
return video
def search_videos(self, pattern, sortby=ICapVideo.SEARCH_RELEVANCE, nsfw=False, max_results=None):
YOUTUBE_MAX_RESULTS = 50
YOUTUBE_MAX_START_INDEX = 1000
yt_service = gdata.youtube.service.YouTubeService()
yt_service.ssl = True
start_index = 1
nb_yielded = 0
while True:
query = gdata.youtube.service.YouTubeVideoQuery()
if pattern is not None:
if isinstance(pattern, unicode):
pattern = pattern.encode('utf-8')
query.vq = pattern
query.orderby = ('relevance', 'rating', 'viewCount', 'published')[sortby]
query.racy = 'include' if nsfw else 'exclude'
if max_results is None or max_results > YOUTUBE_MAX_RESULTS:
query_max_results = YOUTUBE_MAX_RESULTS
else:
query_max_results = max_results
query.max_results = query_max_results
if start_index > YOUTUBE_MAX_START_INDEX:
return
query.start_index = start_index
start_index += query_max_results
feed = yt_service.YouTubeQuery(query)
for entry in feed.entry:
yield self._entry2video(entry)
nb_yielded += 1
if nb_yielded == max_results:
return
if nb_yielded < query_max_results:
return
def latest_videos(self):
return self.search_videos(None, ICapVideo.SEARCH_DATE)
def fill_video(self, video, fields):
if 'thumbnail' in fields:
video.thumbnail.data = urllib.urlopen(video.thumbnail.url).read()
if 'url' in fields:
self._set_video_url(video)
return video
def iter_resources(self, objs, split_path):
if BaseVideo in objs:
collection = self.get_collection(objs, split_path)
if collection.path_level == 0:
yield self.get_collection(objs, [u'latest'])
if collection.split_path == [u'latest']:
for video in self.latest_videos():
yield video
def validate_collection(self, objs, collection):
if collection.path_level == 0:
return
if BaseVideo in objs and collection.split_path == [u'latest']:
collection.title = u'Latest YouTube videos'
return
raise CollectionNotFound(collection.split_path)
OBJECTS = {YoutubeVideo: fill_video}