[grooveshark] update to match Album and Playlist management in radioob

This commit is contained in:
Bezleputh 2014-02-18 23:05:11 +01:00 committed by Florent
commit 2d25289a68
3 changed files with 176 additions and 167 deletions

View file

@ -19,14 +19,22 @@
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.capabilities.audio import ICapAudio, BaseAudio
from weboob.capabilities.collection import ICapCollection, Collection, CollectionNotFound
from weboob.capabilities.audio import ICapAudio, BaseAudio, Album, Playlist, decode_id
from weboob.capabilities.collection import ICapCollection, CollectionNotFound
from .browser import GroovesharkBrowser
from weboob.tools.value import ValueBackendPassword, Value
__all__ = ['GroovesharkBackend']
def cmp_id(p1, p2):
if p1.id == p2.id:
return 0
if p1.id > p2.id:
return 1
return -1
class GroovesharkBackend(BaseBackend, ICapAudio, ICapCollection):
NAME = 'grooveshark'
DESCRIPTION = u'Grooveshark music streaming website'
@ -49,7 +57,8 @@ class GroovesharkBackend(BaseBackend, ICapAudio, ICapCollection):
def fill_audio(self, audio, fields):
if 'url' in fields:
with self.browser:
audio.url = unicode(self.browser.get_stream_url_from_song_id(audio.id))
_id = BaseAudio.decode_id(audio.id)
audio.url = unicode(self.browser.get_stream_url_from_song_id(_id))
if 'thumbnail' in fields and audio.thumbnail:
with self.browser:
audio.thumbnail.data = self.browser.readurl(audio.thumbnail.url)
@ -58,60 +67,67 @@ class GroovesharkBackend(BaseBackend, ICapAudio, ICapCollection):
with self.browser:
return self.browser.search_audio(pattern)
@decode_id(BaseAudio.decode_id)
def get_audio(self, _id):
with self.browser:
return self.browser.get_audio_from_song_id(_id)
def fill_album(self, album, fields):
_id = Album.decode_id(album.id)
album.tracks_list = []
for song in self.browser.get_all_songs_from_album(_id):
album.tracks_list.append(song)
def search_album(self, pattern, sortby=ICapAudio.SEARCH_RELEVANCE):
with self.browser:
return self.browser.search_albums(pattern)
@decode_id(Album.decode_id)
def get_album(self, _id):
with self.browser:
album = self.browser.get_album_by_id(_id)
album.tracks_list = []
for song in self.browser.get_all_songs_from_album(_id):
album.tracks_list.append(song)
album.tracks_list.sort(cmp=cmp_id)
return album
def fill_playlist(self, playlist, fields):
playlist.tracks_list = []
_id = Playlist.decode_id(playlist.id)
for song in self.browser.get_all_songs_from_playlist(_id):
playlist.tracks_list.append(song)
def search_playlist(self, pattern, sortby=ICapAudio.SEARCH_RELEVANCE):
with self.browser:
lower_pattern = pattern.lower()
for playlist in self.browser.get_all_user_playlists():
if lower_pattern in playlist.title.lower():
yield playlist
@decode_id(Playlist.decode_id)
def get_playlist(self, _id):
with self.browser:
playlist = Playlist(_id)
playlist.tracks_list = []
for song in self.browser.get_all_songs_from_playlist(_id):
playlist.tracks_list.append(song)
return playlist
def iter_resources(self, objs, split_path):
with self.browser:
if BaseAudio in objs:
collection = self.get_collection(objs, split_path)
if collection.path_level == 0:
yield Collection([u'albums'], u'Search for Albums')
if self.browser.is_logged():
yield Collection([u'playlists'], u'Grooveshark Playlists')
if collection.path_level == 1:
if collection.split_path[0] == u'playlists':
for item in self.browser.get_all_user_playlists(collection.split_path):
yield item
elif collection.split_path[0] == u'albums':
print u'Enter cd [%s\'s name] then ls to launch search' % collection.split_path[0]
if collection.path_level == 2:
if collection.split_path[0] == u'albums':
for item in self.browser.search_albums(collection.split_path):
yield item
if collection.split_path[0] == u'playlists':
for audio in self.browser.get_all_songs_from_playlist(collection.split_path[1]):
yield audio
if collection.path_level == 3 and collection.split_path[0] == u'albums':
for audio in self.browser.get_all_songs_from_album(collection.split_path[2]):
yield audio
if Playlist in objs:
self._restrict_level(split_path)
if self.browser.is_logged():
for item in self.browser.get_all_user_playlists():
yield item
def validate_collection(self, objs, collection):
if collection.path_level == 0:
return
if BaseAudio in objs and (collection.split_path == [u'albums'] or collection.split_path == [u'playlists']):
return
if BaseAudio in objs and collection.path_level == 2 and \
(collection.split_path[0] == u'albums' or collection.split_path[0] == u'playlists'):
if collection.split_path[0] == u'playlists':
try:
int(collection.split_path[1])
except ValueError:
raise CollectionNotFound(collection.split_path)
return
if BaseAudio in objs and collection.path_level == 3 and \
(collection.split_path[0] == u'albums'):
try:
int(collection.split_path[2])
except ValueError:
raise CollectionNotFound(collection.split_path)
return
raise CollectionNotFound(collection.split_path)
OBJECTS = {BaseAudio: fill_audio}
OBJECTS = {BaseAudio: fill_audio, Album: fill_album, Playlist: fill_playlist}

View file

@ -19,13 +19,11 @@
from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword
from weboob.tools.json import json as simplejson
from weboob.capabilities.audio import BaseAudio
from weboob.capabilities.audio import BaseAudio, Album, Playlist
from weboob.capabilities.image import BaseImage
from weboob.capabilities import NotAvailable
from weboob.capabilities.collection import Collection
import hashlib
import copy
import uuid
import string
import random
@ -66,8 +64,6 @@ class GroovesharkBrowser(BaseBrowser):
GROOVESHARK_CONSTANTS = ('mobileshark', '20120830', 'gooeyFlubber')
COMMUNICATION_TOKEN = None
AUDIOS_FROM_SONG_RESULTS = None
user_id = None
def home(self):
@ -91,112 +87,81 @@ class GroovesharkBrowser(BaseBrowser):
if not self.is_logged:
raise BrowserIncorrectPassword()
def get_all_user_playlists(self, split_path):
def get_all_user_playlists(self):
if self.is_logged():
method = 'userGetPlaylists'
parameters = {}
parameters['userID'] = self.user_id
response = self.API_post(method, parameters, self.create_token(method))
return self.create_collection_from_playlists_result(response['result']['Playlists'], split_path)
return self.create_playlists_from_result(response['result']['Playlists'])
return []
def create_search_parameter(self, _type, pattern):
parameters = {}
parameters['query'] = pattern.encode(self.ENCODING)
parameters['type'] = [_type]
parameters['guts'] = 0
parameters['ppOverr'] = ''
return parameters
def search_audio(self, pattern):
method = 'getResultsFromSearch'
parameters = {}
parameters['query'] = pattern.encode(self.ENCODING)
parameters['type'] = ['Songs']
parameters['guts'] = 0
parameters['ppOverr'] = ''
response = self.API_post(method, parameters, self.create_token(method))
songs = self.create_audio_from_songs_result(response['result']['result']['Songs'])
return songs
def search_albums(self, split_path):
pattern = split_path[1]
method = 'getResultsFromSearch'
parameters = {}
parameters['query'] = pattern.encode(self.ENCODING)
parameters['type'] = ['Albums']
parameters['guts'] = 0
parameters['ppOverr'] = ''
response = self.API_post(method, parameters, self.create_token(method))
return self.create_collection_from_albums_result(response['result']['result']['Albums'], split_path)
response = self.API_post(method, self.create_search_parameter('Songs', pattern), self.create_token(method))
return self.create_audio_from_songs_result(response['result']['result']['Songs'])
def create_audio_from_songs_result(self, songs):
self.AUDIOS_FROM_SONG_RESULTS = []
for song in songs:
audio = GroovesharkAudio(song['SongID'])
audio.title = u'Song - %s' % song['SongName'].encode('ascii', 'replace')
audio.author = u'%s' % song['ArtistName'].encode('ascii', 'replace')
audio.description = u'%s - %s - %s' % (audio.author, song['AlbumName'].encode('ascii', 'replace'), song['Year'].encode('ascii', 'replace'))
audio.thumbnail = BaseImage(u'http://images.gs-cdn.net/static/albums/40_' + song['CoverArtFilename'])
audio.thumbnail.url = audio.thumbnail.id
audio.duration = datetime.timedelta(seconds=int(float(song['EstimateDuration'])))
try:
audio.date = datetime.date(year=int(song['Year']), month=1, day=1)
except ValueError:
audio.date = NotAvailable
self.AUDIOS_FROM_SONG_RESULTS.append(audio)
yield audio
yield self.create_audio(song)
def create_audio_from_album_result(self, songs):
self.AUDIOS_FROM_SONG_RESULTS = []
audios = list()
for song in songs:
audio = self.create_audio(song)
if audio:
self.AUDIOS_FROM_SONG_RESULTS.append(audio)
audios.append(audio)
return audios
def get_audio_from_song_id(self, _id):
audio = GroovesharkAudio(_id)
audio.url = self.get_stream_url_from_song_id(_id)
return audio
def create_audio(self, song):
if song['EstimateDuration']:
audio = GroovesharkAudio(song['SongID'])
audio.title = u'Song - %s' % song['Name'].encode('ascii', 'replace')
audio.author = u'%s' % song['ArtistName'].encode('ascii', 'replace')
audio.description = u'%s - %s' % (audio.author, song['AlbumName'].encode('ascii', 'replace'))
if song['CoverArtFilename']:
audio.thumbnail = BaseImage(u'http://images.gs-cdn.net/static/albums/40_' + song['CoverArtFilename'])
audio.thumbnail.url = audio.thumbnail.id
audio.duration = datetime.timedelta(seconds=int(float(song['EstimateDuration'])))
audio.date = NotAvailable
return audio
def create_collection_from_playlists_result(self, playlists, split_path):
items = list()
for playlist in playlists:
path = copy.deepcopy(split_path)
path.append(u'%s' % playlist['PlaylistID'])
items.append(Collection(path, u'%s' % (playlist['Name'])))
return items
def get_all_songs_from_playlist(self, playlistID):
method = 'getPlaylistByID'
def get_stream_url_from_song_id(self, _id):
method = 'getStreamKeyFromSongIDEx'
parameters = {}
parameters['playlistID'] = playlistID
parameters['prefetch'] = False
parameters['mobile'] = True
parameters['songID'] = int(_id)
parameters['country'] = self.HEADER['country']
response = self.API_post(method, parameters, self.create_token(method))
return self.create_audio_from_album_result(response['result']['Songs'])
def create_collection_from_albums_result(self, albums, split_path):
items = list()
for album in albums:
path = copy.deepcopy(split_path)
path.append(u'%s' % album['AlbumID'])
items.append(Collection(path, u'%s - %s' % (album['AlbumName'], album['ArtistName'])))
return items
self.mark_song_downloaded_ex(response['result'])
return u'http://%s/stream.php?streamKey=%s' % (response['result']['ip'], response['result']['streamKey'])
def search_albums(self, pattern):
method = 'getResultsFromSearch'
response = self.API_post(method, self.create_search_parameter('Albums', pattern), self.create_token(method))
return self.create_albums_from_result(response['result']['result']['Albums'])
def get_album_by_id(self, _id):
method = 'getAlbumByID'
parameters = {}
parameters['albumID'] = _id
response = self.API_post(method, parameters, self.create_token(method))
return self.create_album(response['result'])
def create_albums_from_result(self, albums):
for _album in albums:
yield self.create_album(_album)
def create_album(self, _album):
album = Album(_album['AlbumID'])
try:
album.title = u'%s' % _album['AlbumName']
except:
album.title = u'%s' % _album['Name']
album.author = u'%s' % _album['ArtistName']
album.year = int(_album['Year'])
if _album['CoverArtFilename']:
album.thumbnail = BaseImage(u'http://images.gs-cdn.net/static/albums/80_' + _album['CoverArtFilename'])
album.thumbnail.url = album.thumbnail.id
return album
def get_all_songs_from_album(self, album_id):
method = 'albumGetAllSongs'
@ -210,6 +175,52 @@ class GroovesharkBrowser(BaseBrowser):
response = self.API_post(method, parameters, self.create_token(method))
return self.create_audio_from_album_result(response['result'])
def create_audio_from_album_result(self, songs):
for song in songs:
audio = self.create_audio(song)
if audio:
yield audio
def create_audio(self, song):
audio = GroovesharkAudio(song['SongID'])
try:
audio.title = u'%s' % song['SongName'].encode('ascii', 'replace')
except:
audio.title = u'%s' % song['Name'].encode('ascii', 'replace')
audio.author = u'%s' % song['ArtistName'].encode('ascii', 'replace')
audio.description = u'%s - %s' % (audio.author, song['AlbumName'].encode('ascii', 'replace'))
if song['CoverArtFilename']:
audio.thumbnail = BaseImage(u'http://images.gs-cdn.net/static/albums/40_' + song['CoverArtFilename'])
audio.thumbnail.url = audio.thumbnail.id
if song['EstimateDuration']:
audio.duration = datetime.timedelta(seconds=int(float(song['EstimateDuration'])))
try:
if 'Year' in song.keys() and song['Year']:
audio.date = datetime.date(year=int(song['Year']), month=1, day=1)
except ValueError:
audio.date = NotAvailable
return audio
def create_playlists_from_result(self, playlists):
for _playlist in playlists:
playlist = Playlist(_playlist['PlaylistID'])
playlist.title = u'%s' % (_playlist['Name'])
yield playlist
def get_all_songs_from_playlist(self, playlistID):
method = 'getPlaylistByID'
parameters = {}
parameters['playlistID'] = playlistID
response = self.API_post(method, parameters, self.create_token(method))
return self.create_audio_from_album_result(response['result']['Songs'])
def get_communication_token(self):
parameters = {'secretKey': hashlib.md5(self.HEADER["session"]).hexdigest()}
result = self.API_post('getCommunicationToken', parameters)
@ -220,29 +231,10 @@ class GroovesharkBrowser(BaseBrowser):
self.get_communication_token()
rnd = (''.join(random.choice(string.hexdigits) for x in range(6)))
return rnd + hashlib.sha1('%s:%s:%s:%s' % (method, self.COMMUNICATION_TOKEN, self.GROOVESHARK_CONSTANTS[2], rnd)).hexdigest()
def get_audio_from_song_id(self, song_id):
if self.AUDIOS_FROM_SONG_RESULTS:
for audio in self.AUDIOS_FROM_SONG_RESULTS:
if audio.id == song_id:
audio.url = self.get_stream_url_from_song_id(song_id)
return audio
def get_stream_url_from_song_id(self, song_id):
method = 'getStreamKeyFromSongIDEx'
parameters = {}
parameters['prefetch'] = False
parameters['mobile'] = True
parameters['songID'] = int(song_id)
parameters['country'] = self.HEADER['country']
response = self.API_post(method, parameters, self.create_token(method))
self.mark_song_downloaded_ex(response['result'])
return u'http://%s/stream.php?streamKey=%s' % (response['result']['ip'], response['result']['streamKey'])
return rnd + hashlib.sha1('%s:%s:%s:%s' % (method,
self.COMMUNICATION_TOKEN,
self.GROOVESHARK_CONSTANTS[2],
rnd)).hexdigest()
# in order to simulate a real browser
def mark_song_downloaded_ex(self, response):

View file

@ -26,33 +26,34 @@ class GroovesharkTest(BackendTest):
BACKEND = 'grooveshark'
def test_grooveshark_audio_search(self):
result = list(self.backend.search_audio("Loic Lantoine"))
result = list(self.backend.search_audio("Gronibard"))
self.assertTrue(len(result) > 0)
v = result[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url is not None, 'URL for audio "%s" not found: %s' % (v.id, v.url))
def test_grooveshark_user_playlist_not_logged(self):
if self.backend.browser.is_logged():
raise SkipTest("User credentials defined")
l1 = list(self.backend.iter_resources([BaseAudio], [u'playlists']))
assert len(l1)==0
l1 = list(self.backend.iter_resources([BaseAudio], []))
assert len(l1) == 0
def test_grooveshark_user_playlist_logged(self):
if not self.backend.browser.is_logged():
raise SkipTest("User credentials not defined")
l1 = list(self.backend.iter_resources([BaseAudio], [u'playlists']))
l1 = list(self.backend.iter_resources([BaseAudio], []))
assert len(l1)
c = l1[0]
l2 = list(self.backend.iter_resources([BaseAudio], c.split_path))
assert len(l2)
v = l2[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url is not None, 'URL for audio "%s" not found: %s' % (v.id, v.url))
def test_grooveshark_album_search(self):
l1 = list(self.backend.iter_resources([BaseAudio], [u'albums', u'live']))
assert len(l1)
c = l1[0]
l2 = list(self.backend.iter_resources([BaseAudio], c.split_path))
assert len(l2)
v = l2[0]
self.backend.fillobj(v, ('url',))
self.assertTrue(v.url is not None, 'URL for audio "%s" not found: %s' % (v.id, v.url))
result = list(self.backend.search_album("Gronibard"))
self.assertTrue(len(result) > 0)
v = result[0]
self.backend.fillobj(v)
assert len(v.tracks_list)
def test_grooveshark_playlist_search(self):
result = list(self.backend.search_playlist("johann"))
self.assertTrue(len(result) > 0)
v = result[0]
self.backend.fillobj(v)
assert len(v.tracks_list)