[arte] adapt to browser2

* use of browser 2
* choose between videos version (ie: subtitles or not)
* support of cinema.arte.tv
This commit is contained in:
Bezleputh 2015-03-10 14:19:13 +01:00
commit cdc4410170
5 changed files with 485 additions and 369 deletions

View file

@ -18,56 +18,70 @@
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
import datetime
import time
import urllib
from weboob.capabilities.collection import Collection
from weboob.capabilities.base import UserError
from weboob.capabilities import NotAvailable
from weboob.capabilities.image import BaseImage
from weboob.tools.json import json as simplejson
from weboob.deprecated.browser import Browser
from weboob.deprecated.browser.decorators import id2url
from .pages import ArteLivePage, ArteLiveVideoPage
from .video import ArteVideo, ArteLiveVideo
from weboob.browser import PagesBrowser, URL
from .pages import VideosListPage, VideoPage, ArteJsonPage
from .video import VERSION_VIDEO, LANG, QUALITY, FORMATS, SITE
__all__ = ['ArteBrowser']
class ArteBrowser(Browser):
DOMAIN = u'videos.arte.tv'
ENCODING = None
PAGES = {r'http://concert.arte.tv/\w+': ArteLivePage,
r'http://concert.arte.tv/(?P<id>.+)': ArteLiveVideoPage,
}
class ArteBrowser(PagesBrowser):
BASEURL = 'http://arte.tv/'
LIVE_LANG = {'F': 'fr',
'D': 'de'
}
webservice = URL('papi/tvguide/(?P<class_name>.*)/(?P<method_name>.*)/(?P<parameters>.*).json',
'http://(?P<__site>.*).arte.tv/(?P<_lang>\w{2})/player/(?P<_id>.*)',
'https://api.arte.tv/api/player/v1/config/(?P<__lang>\w{2})/(?P<vid>.*)\?vector=(?P<___site>.*)',
ArteJsonPage)
videos_list = URL('http://(?P<site>.*).arte.tv/(?P<lang>\w{2})/?(?P<cat>.*?)', VideosListPage)
video_page = URL('http://(?P<_site>.*).arte.tv/(?P<id>.+)', VideoPage)
API_URL = 'http://arte.tv/papi/tvguide'
def __init__(self, lang, quality, order, *args, **kwargs):
self.lang = lang
self.quality = quality
def __init__(self, lang, quality, order, format, version, *args, **kwargs):
self.order = order
Browser.__init__(self, *args, **kwargs)
self.lang = (value for key, value in LANG.items if key == lang).next()
self.version = (value for key, value in VERSION_VIDEO.items
if self.lang.get('label') in value.keys() and version == key).next()
self.quality = (value for key, value in QUALITY.items if key == quality).next()
self.format = format
@id2url(ArteVideo.id2url)
def get_video(self, url, video=None):
response = self.openurl('%s/ALL.json' % url)
result = simplejson.loads(response.read(), self.ENCODING)
if self.lang.get('label') not in self.version.keys():
raise UserError('%s is not available for %s' % (self.lang.get('label'), version))
if video is None:
video = self.create_video(result['video'])
try:
video.url = self.get_m3u8_link(result['video']['VSR'][0]['VUR'])
video.ext = u'm3u8'
except:
video.url, video.ext = NotAvailable, NotAvailable
PagesBrowser.__init__(self, *args, **kwargs)
def search_videos(self, pattern):
class_name = 'videos/plus7'
method_name = 'search'
parameters = '/'.join([self.lang.get('webservice'), 'L1', pattern.encode('utf-8'), 'ALL', 'ALL', '-1',
self.order, '10', '0'])
return self.webservice.go(class_name=class_name, method_name=method_name, parameters=parameters).iter_videos()
def get_video(self, id, video=None):
class_name = 'videos'
method_name = 'stream/player'
parameters = '/'.join([self.lang.get('webservice'), id, 'ALL', 'ALL'])
video = self.webservice.go(class_name=class_name,
method_name=method_name,
parameters=parameters).get_video(obj=video)
video.ext, video.url = self.get_url()
return video
def get_url(self):
url = self.page.get_video_url(self.quality, self.format, self.version.get(self.lang.get('label')),
self.lang.get('version'))
if format == FORMATS.HLS:
ext = u'm3u8'
url = self.get_m3u8_link(url)
else:
ext = u'mp4'
url = url
return ext, url
def get_m3u8_link(self, url):
r = self.openurl(url)
baseurl = url.rpartition('/')[0]
@ -84,189 +98,93 @@ class ArteBrowser(Browser):
return links_by_quality[0]
return NotAvailable
@id2url(ArteLiveVideo.id2url)
def get_live_video(self, url, video=None):
self.location(url)
assert self.is_on_page(ArteLiveVideoPage)
json_url, video = self.page.get_video(video)
return self.fill_live_video(video, json_url)
def fill_live_video(self, video, json_url):
response = self.openurl(json_url)
result = simplejson.loads(response.read(), self.ENCODING)
quality = None
if 'VTI' in result['videoJsonPlayer']:
video.title = u'%s' % result['videoJsonPlayer']['VTI']
if 'VSR' in result['videoJsonPlayer']:
for item in result['videoJsonPlayer']['VSR']:
if self.quality[0] in item:
quality = item
break
if not quality:
url = result['videoJsonPlayer']['VSR'][0]['url']
ext = result['videoJsonPlayer']['VSR'][0]['mediaType']
else:
url = result['videoJsonPlayer']['VSR'][quality]['url']
ext = result['videoJsonPlayer']['VSR'][quality]['mediaType']
video.url = u'%s' % url
video.ext = u'%s' % ext
if 'VDA' in result['videoJsonPlayer']:
date_string = result['videoJsonPlayer']['VDA'][:-6]
try:
video.date = datetime.datetime.strptime(date_string, '%d/%m/%Y %H:%M:%S')
except TypeError:
video.date = datetime.datetime(*(time.strptime(date_string, '%d/%m/%Y %H:%M:%S')[0:6]))
if 'VDU' in result['videoJsonPlayer'].keys():
video.duration = int(result['videoJsonPlayer']['VDU'])
if 'IUR' in result['videoJsonPlayer']['VTU'].keys():
video.thumbnail = BaseImage(result['videoJsonPlayer']['VTU']['IUR'])
video.thumbnail.url = video.thumbnail.id
return video
def home(self):
self.location('http://videos.arte.tv/%s/videos/toutesLesVideos' % self.lang)
def get_video_from_program_id(self, _id):
class_name = 'epg'
method_name = 'program'
level = 'L2'
url = self.API_URL \
+ '/' + class_name \
+ '/' + method_name \
+ '/' + self.lang \
+ '/' + level \
+ '/' + _id \
+ '.json'
response = self.openurl(url)
result = simplejson.loads(response.read(), self.ENCODING)
if 'VDO' in result['abstractProgram'].keys():
video = self.create_video(result['abstractProgram']['VDO'])
return self.get_video(video.id, video)
def search_videos(self, pattern):
class_name = 'videos/plus7'
method_name = 'search'
level = 'L1'
cluster = 'ALL'
channel = 'ALL'
limit = '10'
offset = '0'
url = self.create_url_plus7(class_name, method_name, level, cluster, channel, limit, offset, pattern)
response = self.openurl(url)
result = simplejson.loads(response.read(), self.ENCODING)
return self.create_video_from_plus7(result['videoList'])
def create_video_from_plus7(self, result):
for item in result:
yield self.create_video(item)
def create_video(self, item):
video = ArteVideo(item['VID'])
if 'VSU' in item:
video.title = u'%s : %s' % (item['VTI'], item['VSU'])
else:
video.title = u'%s' % (item['VTI'])
video.rating = int(item['VRT'])
if 'programImage' in item:
url = u'%s' % item['programImage']
video.thumbnail = BaseImage(url)
video.thumbnail.url = video.thumbnail.id
video.duration = datetime.timedelta(seconds=int(item['videoDurationSeconds']))
video.set_empty_fields(NotAvailable, ('url',))
if 'VDE' in item:
video.description = u'%s' % item['VDE']
if 'VDA' in item:
m = re.match('(\d{2})\s(\d{2})\s(\d{4})(.*?)', item['VDA'])
if m:
dd = int(m.group(1))
mm = int(m.group(2))
yyyy = int(m.group(3))
video.date = datetime.date(yyyy, mm, dd)
return video
def create_url_plus7(self, class_name, method_name, level, cluster, channel, limit, offset, pattern=None):
url = self.API_URL \
+ '/' + class_name \
+ '/' + method_name \
+ '/' + self.lang \
+ '/' + level
if pattern:
url += '/' + urllib.quote(pattern.encode('utf-8'))
url += '/' + channel \
+ '/' + cluster \
+ '/' + '-1' \
+ '/' + self.order \
+ '/' + limit \
+ '/' + offset \
+ '.json'
return url
def get_arte_programs(self):
class_name = 'epg'
method_name = 'clusters'
url = self.API_URL \
+ '/' + class_name \
+ '/' + method_name \
+ '/' + self.lang \
+ '/0/ALL.json'
response = self.openurl(url)
result = simplejson.loads(response.read(), self.ENCODING)
return result['configClusterList']
def program_videos(self, program):
class_name = 'epg'
method_name = 'cluster'
url = self.API_URL \
+ '/' + class_name \
+ '/' + method_name \
+ '/' + self.lang \
+ '/' + program \
+ '.json'
response = self.openurl(url)
result = simplejson.loads(response.read(), self.ENCODING)
for item in result['clusterWrapper']['broadcasts']:
if 'VDS' in item.keys() and len(item['VDS']) > 0:
video = self.get_video_from_program_id(item['programId'])
if video:
yield video
parameters = '/'.join([self.lang.get('webservice'), 'L2', _id])
video = self.webservice.go(class_name=class_name, method_name=method_name,
parameters=parameters).get_program_video()
return self.get_video(video.id, video)
def latest_videos(self):
class_name = 'videos'
method_name = 'plus7'
level = 'L1'
cluster = 'ALL'
channel = 'ALL'
limit = '10'
offset = '0'
parameters = '/'.join([self.lang.get('webservice'), 'L1', 'ALL', 'ALL', '-1', self.order, '10', '0'])
return self.webservice.go(class_name=class_name, method_name=method_name, parameters=parameters).iter_videos()
url = self.create_url_plus7(class_name, method_name, level, cluster, channel, limit, offset)
response = self.openurl(url)
result = simplejson.loads(response.read(), self.ENCODING)
return self.create_video_from_plus7(result['videoList'])
def get_arte_programs(self):
class_name = 'epg'
method_name = 'clusters'
parameters = '/'.join([self.lang.get('webservice'), '0', 'ALL'])
return self.webservice.go(class_name=class_name, method_name=method_name,
parameters=parameters).iter_programs(title=self.lang.get('title'))
def get_arte_live_categories(self):
self.location('http://concert.arte.tv/%s' % self.LIVE_LANG[self.lang])
assert self.is_on_page(ArteLivePage)
return self.page.iter_resources()
def get_arte_program_videos(self, program):
class_name = 'epg'
method_name = 'cluster'
parameters = '/'.join([self.lang.get('webservice'), program[-1]])
available_videos = self.webservice.go(class_name=class_name, method_name=method_name,
parameters=parameters).iter_program_videos()
for item in available_videos:
yield self.get_video_from_program_id(item.id)
def live_videos(self, cat):
self.location('http://concert.arte.tv/%s' % self.LIVE_LANG[self.lang])
assert self.is_on_page(ArteLivePage)
return self.page.iter_videos(cat, lang=self.LIVE_LANG[self.lang])
def get_arte_concert_categories(self):
return self.videos_list.go(site=SITE.CONCERT.get('id'), lang=self.lang.get('site'),
cat='').iter_arte_concert_categories()
def get_arte_concert_videos(self, cat):
return self.videos_list.go(site=SITE.CONCERT.get('id'), lang=self.lang.get('site'),
cat='').iter_arte_concert_videos(cat=cat[-1])
def get_arte_concert_video(self, id, video=None):
json_url = self.video_page.go(_site=SITE.CONCERT.get('id'), id=id).get_json_url()
m = re.search('http://(?P<__site>.*).arte.tv/(?P<_lang>\w{2})/player/(?P<_id>.*)', json_url)
if m:
video = self.webservice.go(__site=m.group('__site'), _lang=m.group('_lang'),
_id=m.group('_id')).get_arte_concert_video(obj=video)
video.ext, video.url = self.get_url()
return video
def get_arte_cinema_categories(self, cat=[]):
menu = self.videos_list.go(site=SITE.CINEMA.get('id'), lang=self.lang.get('site'),
cat='').get_arte_cinema_menu()
menuSplit = map(lambda x: x.split("/")[2:], menu)
result = {}
for record in menuSplit:
here = result
for item in record[:-1]:
if item not in here:
here[item] = {}
here = here[item]
if "end" not in here:
here["end"] = []
here["end"].append(record[-1])
cat = cat if not cat else cat[1:]
for el in cat:
result = result.get(el)
if "end" in result.keys():
return self.page.iter_arte_cinema_categories(cat='/'.join(cat))
else:
categories = []
for item in result.keys():
categories.append(Collection([SITE.CINEMA.get('id'), unicode(item)], unicode(item)))
return categories
def get_arte_cinema_videos(self, cat):
return self.videos_list.go(site=SITE.CINEMA.get('id'), lang=self.lang.get('site'),
cat='/%s' % '/'.join(cat[1:])).get_arte_cinema_videos()
def get_arte_cinema_video(self, id, video=None):
json_url = self.video_page.go(_site=SITE.CINEMA.get('id'), id=id).get_json_url()
m = re.search('https://api.arte.tv/api/player/v1/config/(\w{2})/(.*)\?vector=(.*)\&.*', json_url)
if m:
video = self.webservice.go(__lang=m.group(1),
vid=m.group(2), ___site=m.group(3)).get_arte_cinema_video(obj=video)
video.ext, video.url = self.get_url()
video.id = id
return video

View file

@ -19,14 +19,14 @@
import re
from weboob.tools.ordereddict import OrderedDict
from weboob.capabilities.video import CapVideo, BaseVideo
from weboob.capabilities.collection import CapCollection, CollectionNotFound, Collection
from weboob.tools.backend import Module, BackendConfig
from weboob.tools.value import Value
from .browser import ArteBrowser
from .video import ArteVideo, ArteLiveVideo
from .video import ArteVideo, ArteSiteVideo, VERSION_VIDEO, FORMATS, LANG, QUALITY, SITE
__all__ = ['ArteModule']
@ -46,26 +46,25 @@ class ArteModule(Module, CapVideo, CapCollection):
'LAST_CHANCE': 'Last chance'
}
CONFIG = BackendConfig(Value('lang', label='Lang of videos',
choices={'fr': 'French', 'de': 'Deutsch', 'en': 'English'}, default='fr'),
Value('order', label='Sort order', choices=order, default='AIRDATE_DESC'),
Value('quality', label='Quality of videos', choices=['hd', 'sd', 'md', 'ed'], default='hd'))
versions_choice = OrderedDict([(k, u'%s' % (v.get('label'))) for k, v in VERSION_VIDEO.items])
format_choice = OrderedDict([(k, u'%s' % (v)) for k, v in FORMATS.items])
lang_choice = OrderedDict([(k, u'%s' % (v.get('label'))) for k, v in LANG.items])
quality_choice = [u'%s' % (k) for k, v in QUALITY.items]
TRANSLATION = {'fr': 'F',
'en': 'F',
'de': 'D',
'hd': ['HQ', -1],
'md': ['MQ', 2],
'sd': ['SQ', 0],
'ed': ['EQ', 1]
}
CONFIG = BackendConfig(Value('lang', label='Lang of videos', choices=lang_choice, default=LANG.FRENCH),
Value('order', label='Sort order', choices=order, default='AIRDATE_DESC'),
Value('quality', label='Quality of videos', choices=quality_choice, default=QUALITY.HD),
Value('format', label='Format of videos', choices=format_choice, default=FORMATS.HTTP_MP4),
Value('version', label='Version of videos', choices=versions_choice))
BROWSER = ArteBrowser
def create_default_browser(self):
return self.create_browser(lang=self.TRANSLATION[self.config['lang'].get()],
quality=self.TRANSLATION[self.config['quality'].get()],
order=self.config['order'].get())
return self.create_browser(lang=self.config['lang'].get(),
quality=self.config['quality'].get(),
order=self.config['order'].get(),
format=self.config['format'].get(),
version=self.config['version'].get())
def parse_id(self, _id):
m = re.match('^(\w+)\.(.*)', _id)
@ -74,96 +73,81 @@ class ArteModule(Module, CapVideo, CapCollection):
m = re.match('https?://www.arte.tv/guide/\w+/(?P<id>.+)/(.*)', _id)
if m:
return 'program', m.group(1)
return SITE.PROGRAM.get('id'), m.group(1)
m = re.match('https?://concert.arte.tv/(\w+)/(.*)', _id)
m = re.match('https?://(%s).arte.tv/(\w+)/(.*)' % ('|'.join(value.get('id') for value in SITE.values)), _id)
if m:
return 'live', '/%s/%s' % (m.group(1), m.group(2))
return m.group(1), '/%s/%s' % (m.group(2), m.group(3))
return 'videos', _id
def get_video(self, _id):
with self.browser:
site, _id = self.parse_id(_id)
site, _id = self.parse_id(_id)
if site == 'live':
return self.browser.get_live_video(_id)
if site in [value.get('id') for value in SITE.values]:
_site = (value for value in SITE.values if value.get('id') == site).next()
return getattr(self.browser, _site.get('video'))(_id)
elif site == 'program':
return self.browser.get_video_from_program_id(_id)
else:
return self.browser.get_video(_id)
else:
return self.browser.get_video(_id)
def search_videos(self, pattern, sortby=CapVideo.SEARCH_RELEVANCE, nsfw=False):
with self.browser:
return self.browser.search_videos(pattern)
return self.browser.search_videos(pattern)
def fill_video(self, video, fields):
def fill_arte_video(self, video, fields):
if fields != ['thumbnail']:
# if we don't want only the thumbnail, we probably want also every fields
with self.browser:
site, _id = self.parse_id(video.id)
video = self.browser.get_video(video.id, video)
if isinstance(video, ArteVideo):
video = self.browser.get_video(_id, video)
if isinstance(video, ArteLiveVideo):
video = self.browser.get_live_video(_id, video)
if 'thumbnail' in fields and video and video.thumbnail:
with self.browser:
video.thumbnail.data = self.browser.readurl(video.thumbnail.url)
video.thumbnail.data = self.browser.open(video.thumbnail.url).content
return video
def fill_site_video(self, video, fields):
if fields != ['thumbnail']:
for site in SITE.values:
m = re.match('%s\.(.*)' % site.get('id'), video.id)
if m:
video = getattr(self.browser, site.get('video'))(m.group(1), video)
break
if 'thumbnail' in fields and video and video.thumbnail:
video.thumbnail.data = self.browser.open(video.thumbnail.url).content
return video
def iter_resources(self, objs, split_path):
with self.browser:
if BaseVideo in objs:
collection = self.get_collection(objs, split_path)
if collection.path_level == 0:
yield Collection([u'arte-latest'], u'Latest Arte videos')
yield Collection([u'arte-live'], u'Arte Web Live videos')
yield Collection([u'arte-program'], u'Arte Programs')
if collection.path_level == 1:
if collection.split_path == [u'arte-latest']:
for video in self.browser.latest_videos():
yield video
if collection.split_path == [u'arte-live']:
for categorie in self.browser.get_arte_live_categories():
yield categorie
if collection.split_path == [u'arte-program']:
for item in self.browser.get_arte_programs():
lang = self.TRANSLATION[self.config['lang'].get()]
if BaseVideo in objs:
collection = self.get_collection(objs, split_path)
if collection.path_level == 0:
yield Collection([u'arte-latest'], u'Latest Arte videos')
for site in SITE.values:
yield Collection([site.get('id')], site.get('label'))
if collection.path_level == 1:
if collection.split_path == [u'arte-latest']:
for video in self.browser.latest_videos():
yield video
else:
for site in SITE.values:
if collection.split_path[0] == site.get('id') and collection.path_level in site.keys():
for item in getattr(self.browser, site.get(collection.path_level))():
yield item
if lang == 'F':
title = 'titleFR'
elif lang == 'D':
title = 'titleDE'
else:
title = 'name'
name = item['clusterId']
if title in item.keys():
name = item[title]
yield Collection([u'arte-program', item['clusterId']], u'%s' % name)
if collection.path_level == 2:
if collection.split_path[0] == u'arte-live':
for video in self.browser.live_videos(collection.basename):
yield video
if collection.split_path[0] == u'arte-program':
for video in self.browser.program_videos(collection.split_path[1]):
yield video
if collection.path_level >= 2:
for site in SITE.values:
if collection.split_path[0] == site.get('id') and collection.path_level in site.keys():
for item in getattr(self.browser, site.get(collection.path_level))(collection.split_path):
yield item
def validate_collection(self, objs, collection):
if collection.path_level == 0:
return
if BaseVideo in objs and (collection.split_path == [u'arte-latest'] or
collection.split_path == [u'arte-live'] or
collection.split_path == [u'arte-program']):
collection.split_path[0] in [value.get('id') for value in SITE.values]):
return
if BaseVideo in objs and collection.path_level == 2 and (collection.split_path[0] == u'arte-live' or
collection.split_path[0] == u'arte-program'):
if BaseVideo in objs and collection.path_level >= 2 and\
collection.split_path[0] in [value.get('id') for value in SITE.values]:
return
raise CollectionNotFound(collection.split_path)
OBJECTS = {ArteVideo: fill_video, ArteLiveVideo: fill_video}
OBJECTS = {ArteVideo: fill_arte_video, ArteSiteVideo: fill_site_video}

View file

@ -17,60 +17,249 @@
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from datetime import timedelta
from weboob.deprecated.browser import Page
from weboob.tools.html import html2text
from weboob.capabilities import NotAvailable
from weboob.capabilities.image import BaseImage
from weboob.capabilities.base import BaseObject, NotAvailable
from weboob.capabilities.collection import Collection
from .video import ArteLiveVideo
from weboob.browser.pages import HTMLPage, JsonPage
from weboob.browser.elements import DictElement, ItemElement, ListElement, method
from weboob.browser.filters.standard import Date, Format, Env, CleanText, Field, Regexp, Join
from weboob.browser.filters.json import Dict
from weboob.browser.filters.html import XPath
class ArteLiveVideoPage(Page):
def get_video(self, video=None):
if not video:
video = ArteLiveVideo('/%s' % self.group_dict['id'])
div = self.document.xpath('//div[@class="bloc-presentation"]')[0]
description = self.parser.select(div,
'div[@class="field field-name-body field-type-text-with-summary field-label-hidden bloc-rte"]',
1,
method='xpath')
video.description = html2text(self.parser.tostring(description))
json_url = self.document.xpath('//div[@class="video-container"]')[0].attrib['arte_vp_url']
return json_url, video
from .video import ArteVideo, ArteSiteVideo, SITE
class ArteLivePage(Page):
def iter_resources(self):
items = list()
for el in self.document.xpath('//ul[@class="filter-liste"]/li'):
_id = el.attrib['data-target'].replace('video_box_tab_', '')
text = self.parser.select(el, 'a/span', 1, method='xpath').text
item = Collection([u'arte-live', u'%s' % _id], u'%s' % (text))
items.append(item)
return items
class ArteItemElement(ItemElement):
def iter_videos(self, cat, lang='fr'):
articles = self.document.xpath('//div[@id="video_box_tab_%s"]/article' % cat)
videos = list()
for article in articles:
_id = article.attrib['about']
title = self.parser.select(article,
'div/div[@class="info-article "]/div/h3/a',
1,
method='xpath').text
thumbnail = self.parser.select(article,
'div/div/a/figure/span/span',
1,
method='xpath').attrib['data-src']
obj_id = Dict('VID')
video = ArteLiveVideo(_id)
video.title = u'%s' % title
video.thumbnail = BaseImage(thumbnail)
video.thumbnail.url = video.thumbnail.id
video.set_empty_fields(NotAvailable, ('url',))
videos.append(video)
return videos
def obj_title(self):
vti = Dict('VTI')(self)
vtu = Dict('VSU', default=None)(self)
if not vtu:
return vti
return '%s: %s' % (vti, vtu)
obj_rating = Dict('VRT', default=NotAvailable)
obj_rating_max = 10
obj_description = Dict('VDE', default=NotAvailable)
obj_date = Date(Dict('VDA'))
def obj_duration(self):
seconds = Dict('videoDurationSeconds')(self)
if isinstance(seconds, basestring):
seconds = int(seconds)
return timedelta(seconds=seconds)
def obj_thumbnail(self):
url = Dict('VTU/IUR')(self)
thumbnail = BaseImage(url)
thumbnail.url = thumbnail.id
return thumbnail
class VideosListPage(HTMLPage):
@method
class iter_arte_concert_categories(ListElement):
item_xpath = '//ul[@class="filter-liste"]/li'
class item(ItemElement):
klass = Collection
obj_title = CleanText('./a/span')
obj_id = CleanText('./@data-target', replace=[('video_box_tab_', '')])
def obj_split_path(self):
_id = CleanText('./@data-target', replace=[('video_box_tab_', '')])(self)
return [SITE.CONCERT.get('id'), u'%s' % _id]
@method
class iter_arte_concert_videos(ListElement):
def find_elements(self):
self.item_xpath = '//div[@id="video_box_tab_%s"]/article' % Env('cat')(self)
for el in self.el.xpath(self.item_xpath):
yield el
class item(ItemElement):
klass = ArteSiteVideo
obj__site = SITE.CONCERT.get('id')
obj_id = Format('%s.%s', Field('_site'), CleanText('./@about'))
obj_title = CleanText('div/div[@class="info-article "]/div/h3/a')
def obj_thumbnail(self):
url = CleanText('div/div/a/figure/span/span/@data-src')(self)
thumbnail = BaseImage(url)
thumbnail.url = thumbnail.id
return thumbnail
@method
class iter_arte_cinema_categories(ListElement):
item_xpath = '//li[has-class("leaf")]'
class item(ItemElement):
klass = Collection
def condition(self):
return Regexp(CleanText('./a/@href'), '^(/\w{2}/%s/.*)' % self.env['cat'], default=None)(self)
obj_title = CleanText('./a')
obj_id = CleanText('./a/@href')
def obj_split_path(self):
_id = Regexp(CleanText('./a/@href'), '/\w{2}/(.*)')(self)
return [SITE.CINEMA.get('id')] + _id.split('/')
def get_arte_cinema_menu(self):
return self.doc.xpath('//li[has-class("leaf")]/a[starts-with(@href,"/")]/@href')
@method
class get_arte_cinema_videos(ListElement):
item_xpath = '//article'
class item(ItemElement):
klass = ArteSiteVideo
def condition(self):
return len(XPath('.//div[@class="article-secondary "]')(self)) == 1 and\
len(XPath('.//article')(self)) == 0
obj__site = SITE.CINEMA.get('id')
obj_id = Format('%s.%s', Field('_site'), CleanText('./@about'))
obj_title = Join(u' - ',
'.//div[@class="article-secondary "]/div/div')
def obj_thumbnail(self):
url = CleanText('.//div[@class="article-primary "]/div[has-class("field-thumbnail")]/span/noscript/img/@src')(self)
thumbnail = BaseImage(url)
thumbnail.url = thumbnail.id
return thumbnail
class VideoPage(HTMLPage):
def get_json_url(self):
return self.doc.xpath('//div[@class="video-container"]')[0].attrib['arte_vp_url']
class ArteJsonPage(JsonPage):
def get_video_url(self, quality, format, version, language_version):
urls = Dict('videoJsonPlayer/VSR')(self.doc).keys()
if urls:
key = '_'.join([format, quality, version])
found = self.find_url(key, urls, version, quality)
if not found:
# We use the default language version
key = '_'.join([format, quality, language_version])
found = self.find_url(key, urls, version, quality)
if not found:
# We only keep the quality
key = '_'.join([quality, language_version])
found = self.find_url(key, urls, version, quality)
if not found:
found = urls[0]
streamer = Dict('videoJsonPlayer/VSR/%s/streamer' % (found), default=None)(self.doc)
url = Dict('videoJsonPlayer/VSR/%s/url' % (found))(self.doc)
if streamer:
return '%s%s' % (streamer, url)
return url
def find_url(self, key, urls, version, quality):
self.logger.debug('available urls: %s' % urls)
self.logger.debug('search url matching : %s' % key)
# Best Case: key is mathing
matching = [s for s in urls if key in s]
self.logger.debug('best case matching: %s' % matching)
if matching:
return matching[0]
# Second Case: is the version available
matching = [s for s in urls if version in s]
self.logger.debug('is version available: %s' % matching)
if matching:
# Do the quality + version match
matching_quality = [s for s in matching if quality in s]
self.logger.debug('does quality + version match: %s' % matching_quality)
if matching_quality:
return matching[0]
# Only format + version mathes
return matching[0]
@method
class iter_videos(DictElement):
item_xpath = 'videoList'
class item(ArteItemElement):
klass = ArteVideo
@method
class iter_programs(DictElement):
item_xpath = 'configClusterList'
class item(ItemElement):
klass = Collection
obj_title = Dict(CleanText(Env('title')))
obj_id = Dict('clusterId')
def obj_split_path(self):
return [SITE.PROGRAM.get('id'), Dict('clusterId')(self)]
@method
class get_video(ArteItemElement):
def __init__(self, *args, **kwargs):
super(ArteItemElement, self).__init__(*args, **kwargs)
self.el = self.el.get('videoJsonPlayer')
klass = ArteVideo
@method
class get_arte_concert_video(ArteItemElement):
def __init__(self, *args, **kwargs):
super(ArteItemElement, self).__init__(*args, **kwargs)
self.el = self.el.get('videoJsonPlayer')
klass = ArteSiteVideo
obj__site = SITE.CONCERT.get('id')
obj_id = Format('%s.%s', Field('_site'), Regexp(Dict('VTR'), 'http://concert.arte.tv(.*)'))
@method
class get_arte_cinema_video(ArteItemElement):
def __init__(self, *args, **kwargs):
super(ArteItemElement, self).__init__(*args, **kwargs)
self.el = self.el.get('videoJsonPlayer')
klass = ArteSiteVideo
obj__site = SITE.CINEMA.get('id')
obj_date = Date(Dict('VRA'))
@method
class get_program_video(ArteItemElement):
def __init__(self, *args, **kwargs):
super(ArteItemElement, self).__init__(*args, **kwargs)
if 'VDO' in self.el['abstractProgram'].keys():
self.el = self.el['abstractProgram']['VDO']
klass = ArteVideo
@method
class iter_program_videos(DictElement):
item_xpath = 'clusterWrapper/broadcasts'
ignore_duplicate = True
class item(ItemElement):
klass = BaseObject
def condition(self):
return 'VDS' in self.el.keys() and len(self.el['VDS']) > 0
obj_id = Dict('programId')

View file

@ -20,6 +20,7 @@
from weboob.tools.test import BackendTest
from weboob.capabilities.video import BaseVideo
from .video import SITE
class ArteTest(BackendTest):
@ -32,14 +33,23 @@ class ArteTest(BackendTest):
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))
def test_live(self):
l1 = list(self.backend.iter_resources([BaseVideo], [u'arte-live']))
assert len(l1)
l2 = list(self.backend.iter_resources([BaseVideo], l1[0].split_path))
assert len(l2)
v = l2[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))
def test_sites(self):
for site in SITE.values:
if site.get('id') == SITE.PROGRAM.get('id'):
continue
l1 = list(self.backend.iter_resources([BaseVideo], [site.get('id')]))
assert len(l1)
l1 = l1[0]
while not isinstance(l1, BaseVideo):
l1 = list(self.backend.iter_resources([BaseVideo], l1.split_path))
assert len(l1)
l1 = l1[0]
self.backend.fillobj(l1, ('url',))
self.assertTrue(l1.url, 'URL for video "%s" not found' % (l1.id))
def test_latest(self):
l = list(self.backend.iter_resources([BaseVideo], [u'arte-latest']))
@ -49,7 +59,7 @@ class ArteTest(BackendTest):
self.assertTrue(v.url, 'URL for video "%s" not found' % (v.id))
def test_program(self):
l1 = list(self.backend.iter_resources([BaseVideo], [u'arte-program']))
l1 = list(self.backend.iter_resources([BaseVideo], [u'program']))
assert len(l1)
# some categories may contain no available videos (during summer period for example)
for l in l1:

View file

@ -17,21 +17,36 @@
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.capabilities.base import enum
from weboob.capabilities.video import BaseVideo
FORMATS = enum(HTTP_MP4=u'HBBTV', HLS=u'M3U8', RTMP=u'RTMP', HLS_MOBILE=u'MOBILE')
LANG = enum(FRENCH={u'label': u'French', u'webservice': u'F', u'site': u'fr', u'version': u'1', u'title': u'titleFR'},
GERMAN={u'label': u'German', u'webservice': u'D', u'site': u'de', u'version': u'1', u'title': u'titleDE'})
SITE = enum(PROGRAM={u'id': u'program', u'label': u'Arte Programs', 1: 'get_arte_programs',
2: 'get_arte_program_videos', u'video': 'get_video_from_program_id'},
CONCERT={u'id': u'concert', u'label': u'Arte Concert videos', 1: 'get_arte_concert_categories',
2: 'get_arte_concert_videos', 'video': 'get_arte_concert_video'},
CINEMA={u'id': u'cinema', u'label': u'Arte Cinema', 1: 'get_arte_cinema_categories',
2: 'get_arte_cinema_categories', 3: 'get_arte_cinema_videos', 'video': 'get_arte_cinema_video'})
QUALITY = enum(HD=u'SQ', MD=u'EQ', SD=u'MQ', LD=u'LQ')
VERSION_VIDEO = enum(VOSTA={u'label': u'Original version subtitled (German)', LANG.GERMAN.get('label'): u'3'},
VOSTF={u'label': u'Original version subtitled (French)', LANG.FRENCH.get('label'): u'3'},
VASTA={u'label': u'Translated version (German)',
LANG.GERMAN.get('label'): u'1', LANG.FRENCH.get('label'): u'2'},
VFSTF={u'label': u'Translated version (French)',
LANG.FRENCH.get('label'): u'1', LANG.GERMAN.get('label'): u'2'},
VASTMA={u'label': u'Deaf version (German)', LANG.GERMAN.get('label'): u'8'},
VFSTMF={u'label': u'Deaf version (French)', LANG.FRENCH.get('label'): u'8'})
class ArteVideo(BaseVideo):
@classmethod
def id2url(cls, _id):
lang = _id[-1:]
return 'http://arte.tv/papi/tvguide/videos/stream/%s/%s/M3U8' % (lang, _id)
pass
class ArteLiveVideo(BaseVideo):
def __init__(self, _id, *args, **kwargs):
BaseVideo.__init__(self, 'live.%s' % _id, *args, **kwargs)
@classmethod
def id2url(cls, _id):
return 'http://concert.arte.tv%s' % _id
class ArteSiteVideo(BaseVideo):
pass