[allocine] handle videos from allocine

This commit is contained in:
Bezleputh 2014-12-24 17:00:22 +01:00 committed by Romain Bignon
commit a6e221f56d
3 changed files with 223 additions and 7 deletions

View file

@ -17,14 +17,16 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>. # along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.collection import Collection
from weboob.capabilities.base import NotAvailable, NotLoaded from weboob.capabilities.video import BaseVideo
from weboob.capabilities.image import BaseImage
from weboob.capabilities.base import NotAvailable, NotLoaded, find_object
from weboob.capabilities.cinema import Movie, Person from weboob.capabilities.cinema import Movie, Person
from weboob.deprecated.browser import Browser from weboob.deprecated.browser import Browser
from weboob.tools.json import json from weboob.tools.json import json
import base64 import base64
import hashlib import hashlib
from datetime import datetime from datetime import datetime, date, timedelta
import time import time
import urllib import urllib
@ -190,7 +192,7 @@ class AllocineBrowser(Browser):
for cast in jres['castMember']: for cast in jres['castMember']:
if cast['activity']['$'] not in roles: if cast['activity']['$'] not in roles:
roles[cast['activity']['$']] = [] roles[cast['activity']['$']] = []
person_to_append = (u'%s'%cast['person']['code'], cast['person']['name']) person_to_append = (u'%s' % cast['person']['code'], cast['person']['name'])
roles[cast['activity']['$']].append(person_to_append) roles[cast['activity']['$']].append(person_to_append)
movie = Movie(id, title) movie = Movie(id, title)
@ -281,7 +283,6 @@ class AllocineBrowser(Browser):
movie_to_append = (u'%s' % (m['movie']['code']), u'(%s) %s' % (pyear, m['movie']['originalTitle'])) movie_to_append = (u'%s' % (m['movie']['code']), u'(%s) %s' % (pyear, m['movie']['originalTitle']))
roles[m['activity']['$']].append(movie_to_append) roles[m['activity']['$']].append(movie_to_append)
person = Person(id, name) person = Person(id, name)
person.real_name = real_name person.real_name = real_name
person.birth_date = birth_date person.birth_date = birth_date
@ -443,3 +444,120 @@ class AllocineBrowser(Browser):
biography = unicode(jres['biography']) biography = unicode(jres['biography'])
return biography return biography
def get_categories_movies(self, category):
params = [('partner', self.PARTNER_KEY),
('format', 'json'),
('mediafmt', 'mp4'),
('filter', category)
]
res = self.__do_request('movielist', params)
if res is None:
return
result = json.loads(res)
for movie in result['feed']['movie']:
if 'trailer' not in movie or 'productionYear' not in movie:
continue
yield self.parse_movie(movie)
def get_categories_videos(self, category):
params = [('partner', self.PARTNER_KEY),
('format', 'json'),
('mediafmt', 'mp4'),
('filter', category)
]
res = self.__do_request('videolist', params)
if res is None:
return
result = json.loads(res)
for episode in result['feed']['media']:
if 'title' in episode:
yield self.parse_video(episode, category)
def parse_video(self, _video, category):
video = BaseVideo(u'%s#%s' % (_video['code'], category))
video.title = unicode(_video['title'])
video._video_code = unicode(_video['code'])
video.ext = u'mp4'
if 'runtime' in _video:
video.duration = timedelta(seconds=int(_video['runtime']))
if 'description' in _video:
video.description = unicode(_video['description'])
renditions = sorted(_video['rendition'], key=lambda x: 'bandwidth' in x and x['bandwidth']['code'], reverse=True)
video.url = unicode(max(renditions, key=lambda x: 'bandwidth' in x)['href'])
return video
def parse_movie(self, movie):
video = BaseVideo(u'%s#%s' % (movie['code'], 'movie'))
video.title = unicode(movie['trailer']['name'])
video._video_code = unicode(movie['trailer']['code'])
video.ext = u'mp4'
video.thumbnail = BaseImage(movie['poster']['href'])
video.thumbnail.url = unicode(movie['poster']['href'])
tdate = movie['release']['releaseDate'].split('-')
day = 1
month = 1
year = 1901
if len(tdate) > 2:
year = int(tdate[0])
month = int(tdate[1])
day = int(tdate[2])
video.date = date(year, month, day)
if 'userRating' in movie['statistics']:
video.rating = movie['statistics']['userRating']
elif 'pressRating' in movie['statistics']:
video.rating = movie['statistics']['pressRating']*2
video.rating_max = 5
if 'synopsis' in movie:
video.description = unicode(movie['synopsis'].replace('<p>', '').replace('</p>', ''))
elif 'synopsisShort' in movie:
video.description = unicode(movie['synopsisShort'].replace('<p>', '').replace('</p>', ''))
if 'castingShort' in movie:
if 'directors' in movie['castingShort']:
video.author = unicode(movie['castingShort']['directors'])
if 'runtime' in movie:
video.duration = timedelta(seconds=int(movie['runtime']))
return video
def get_movie_from_id(self, _id):
params = [('partner', self.PARTNER_KEY),
('format', 'json'),
('mediafmt', 'mp4'),
('filter', 'movie'),
('code', _id),
]
res = self.__do_request('movie', params)
if res is None:
return
result = json.loads(res)
return self.parse_video(result['movie'])
def get_video_from_id(self, _id, category):
return find_object(self.get_categories_videos(category), id=u'%s#%s' % (_id, category))
def get_video_url(self, code):
params = [('partner', self.PARTNER_KEY),
('format', 'json'),
('mediafmt', 'mp4'),
('code', code),
('profile', 'large'),
]
res = self.__do_request('media', params)
if res is None:
return
result = json.loads(res)
renditions = sorted(result['media']['rendition'], key=lambda x: 'bandwidth' in x and x['bandwidth']['code'], reverse=True)
return max(renditions, key=lambda x: 'bandwidth' in x)['href']
def get_emissions(self, basename):
params = [('partner', self.PARTNER_KEY),
('format', 'json'),
('filter', 'acshow'),
]
res = self.__do_request('termlist', params)
if res is None:
return
result = json.loads(res)
for emission in result['feed']['term']:
yield Collection([basename, unicode(emission['nameShort'])], unicode(emission['$']))

View file

@ -17,6 +17,8 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>. # along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.video import CapVideo, BaseVideo
from weboob.capabilities.collection import CapCollection, CollectionNotFound, Collection
from weboob.capabilities.cinema import CapCinema, Person, Movie from weboob.capabilities.cinema import CapCinema, Person, Movie
from weboob.tools.backend import Module from weboob.tools.backend import Module
@ -25,7 +27,7 @@ from .browser import AllocineBrowser
__all__ = ['AllocineModule'] __all__ = ['AllocineModule']
class AllocineModule(Module, CapCinema): class AllocineModule(Module, CapCinema, CapVideo, CapCollection):
NAME = 'allocine' NAME = 'allocine'
MAINTAINER = u'Julien Veyssier' MAINTAINER = u'Julien Veyssier'
EMAIL = 'julien.veyssier@aiur.fr' EMAIL = 'julien.veyssier@aiur.fr'
@ -108,7 +110,72 @@ class AllocineModule(Module, CapCinema):
return movie return movie
def fill_video(self, video, fields):
if 'url' in fields:
with self.browser:
if not isinstance(video, BaseVideo):
video = self.get_video(self, video.id)
if hasattr(video, '_video_code'):
video.url = self.browser.get_video_url(video._video_code)
if 'thumbnail' in fields and video and video.thumbnail:
with self.browser:
video.thumbnail.data = self.browser.readurl(video.thumbnail.url)
return video
def get_video(self, _id):
with self.browser:
split_id = _id.split('#')
if split_id[-1] == 'movir':
return self.browser.get_movie_from_id(split_id[0])
return self.browser.get_video_from_id(split_id[0], split_id[-1])
def iter_resources(self, objs, split_path):
with self.browser:
if BaseVideo in objs:
collection = self.get_collection(objs, split_path)
if collection.path_level == 0:
yield Collection([u'comingsoon'], u'Films prochainement au cinéma')
yield Collection([u'nowshowing'], u'Films au cinéma')
yield Collection([u'acshow'], u'Émissions')
yield Collection([u'interview'], u'Interviews')
if collection.path_level == 1:
if collection.basename == u'acshow':
emissions = self.browser.get_emissions(collection.basename)
if emissions:
for emission in emissions:
yield emission
elif collection.basename == u'interview':
videos = self.browser.get_categories_videos(collection.basename)
if videos:
for video in videos:
yield video
else:
videos = self.browser.get_categories_movies(collection.basename)
if videos:
for video in videos:
yield video
if collection.path_level == 2:
videos = self.browser.get_categories_videos(':'.join(collection.split_path))
if videos:
for video in videos:
yield video
def validate_collection(self, objs, collection):
if collection.path_level == 0:
return
if collection.path_level == 1 and (collection.basename in
[u'comingsoon', u'nowshowing', u'acshow', u'interview']):
return
if collection.path_level == 2 and collection.parent_path == [u'acshow']:
return
raise CollectionNotFound(collection.split_path)
OBJECTS = { OBJECTS = {
Person: fill_person, Person: fill_person,
Movie: fill_movie Movie: fill_movie,
BaseVideo: fill_video
} }

View file

@ -18,6 +18,7 @@
# along with weboob. If not, see <http://www.gnu.org/licenses/>. # along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.test import BackendTest from weboob.tools.test import BackendTest
from weboob.capabilities.video import BaseVideo
import re import re
@ -85,3 +86,33 @@ class AllocineTest(BackendTest):
assert len(persons_ids) > 0 assert len(persons_ids) > 0
for person_id in persons_ids: for person_id in persons_ids:
assert person_id assert person_id
def test_emissions(self):
l = list(self.backend.iter_resources([BaseVideo], [u'acshow']))
assert len(l)
l1 = list(self.backend.iter_resources([BaseVideo], l[0].split_path))
assert len(l1)
v = l1[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))
def test_interview(self):
l = list(self.backend.iter_resources([BaseVideo], [u'interview']))
assert len(l)
v = l[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))
def test_comingsoon(self):
l = list(self.backend.iter_resources([BaseVideo], [u'comingsoon']))
assert len(l)
v = l[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))
def test_nowshowing(self):
l = list(self.backend.iter_resources([BaseVideo], [u'nowshowing']))
assert len(l)
v = l[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))