new backend 'phpbb'

This commit is contained in:
Romain Bignon 2011-08-24 16:25:17 +02:00
commit 2184951c4a
8 changed files with 612 additions and 0 deletions

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .browser import PhpBB
from .backend import PhpBBBackend
__all__ = ['PhpBB', 'PhpBBBackend']

View file

@ -0,0 +1,186 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import with_statement
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.tools.newsfeed import Newsfeed
from weboob.tools.value import Value, ValueInt, ValueBackendPassword
from weboob.tools.misc import limit
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Message, Thread, CantSendMessage
from .browser import PhpBB
from .tools import rssid, url2id
__all__ = ['PhpBBBackend']
class PhpBBBackend(BaseBackend, ICapMessages):
NAME = 'phpbb'
MAINTAINER = 'Romain Bignon'
EMAIL = 'romain@weboob.org'
VERSION = '0.9'
LICENSE = 'AGPLv3+'
DESCRIPTION = "phpBB forum"
CONFIG = BackendConfig(Value('url', label='URL of forum', regexp='https?://.*'),
Value('username', label='Username'),
ValueBackendPassword('password', label='Password'),
ValueInt('thread_unread_messages', label='Limit number of unread messages to retrieve for a thread', default=500)
)
STORAGE = {'seen': {}}
BROWSER = PhpBB
def create_default_browser(self):
return self.create_browser(self.config['url'].get(),
self.config['username'].get(),
self.config['password'].get())
#### ICapMessages ##############################################
def _iter_threads(self, root_link=None):
with self.browser:
links = list(self.browser.iter_links(root_link.url if root_link else None))
for link in links:
if link.type == link.FORUM:
link.title = '%s[%s]' % (root_link.title if root_link else '', link.title)
for thread in self._iter_threads(link):
yield thread
if link.type == link.TOPIC:
thread = Thread(url2id(link.url))
thread.title = ('%s ' % root_link.title if root_link else '') + link.title
thread.date = link.date
thread.nb_messages = link.nb_messages
thread.flags = thread.IS_DISCUSSION
yield thread
def iter_threads(self):
return self._iter_threads()
def get_thread(self, id):
thread = None
parent = None
if isinstance(id, Thread):
thread = id
id = thread.id
thread_id = url2id(id) or id
try:
last_seen_id = self.storage.get('seen', default={})[url2id(thread_id)]
except KeyError:
last_seen_id = 0
with self.browser:
for post in self.browser.iter_posts(id):
if not thread:
thread = Thread(thread_id)
thread.title = post.title
flags = Message.IS_HTML
if last_seen_id < post.id:
flags |= Message.IS_UNREAD
m = Message(thread=thread,
id=post.id,
title=post.title,
sender=post.author,
receivers=None,
date=post.date,
parent=parent,
content=post.content,
signature=post.signature,
children=[],
flags=flags)
if parent:
parent.children = [m]
else:
thread.root = m
parent = m
return thread
def iter_unread_messages(self, thread=None):
with self.browser:
url = self.browser.get_root_feed_url()
for article in Newsfeed(url, rssid).iter_entries():
id = url2id(article.link)
thread_id, message_id = [int(v) for v in id.split('.')]
thread = Thread(thread_id)
try:
last_seen_id = self.storage.get('seen', default={})[thread.id]
except KeyError:
last_seen_id = 0
child = None
iterator = self.browser.riter_posts(id, last_seen_id)
if self.config['thread_unread_messages'].get() > 0:
iterator = limit(iterator, self.config['thread_unread_messages'].get())
for post in iterator:
message = Message(thread=thread,
id=post.id,
title=post.title,
sender=post.author,
receivers=None,
date=post.date,
parent=None,
content=post.content,
signature=post.signature,
children=[],
flags=Message.IS_UNREAD|Message.IS_HTML)
if child:
message.children.append(child)
child.parent = message
if post.parent:
message.parent = Message(thread=thread,
id=post.parent)
else:
thread.root = message
yield message
def set_message_read(self, message):
try:
last_seen_id = self.storage.get('seen', default={})[message.thread.id]
except KeyError:
last_seen_id = 0
if message.id > last_seen_id:
self.storage.set('seen', int(message.thread.id), message.id)
self.storage.save()
def fill_thread(self, thread, fields):
return self.get_thread(thread)
#### ICapMessagesReply #########################################
#def post_message(self, message):
# assert message.thread
# with self.browser:
# return self.browser.post_comment(message.thread.id,
# message.parent.id,
# message.title,
# message.content)
OBJECTS = {Thread: fill_thread}

View file

@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import urllib
from urlparse import urlsplit
from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword
from weboob.capabilities.messages import CantSendMessage
from .pages.index import LoginPage
from .pages.forum import ForumPage, TopicPage
from .tools import id2url, url2id
# Browser
class PhpBB(BaseBrowser):
PAGES = {'https?://.*/index.php': ForumPage,
'https?://.*/viewforum.php\?f=(\d+)': ForumPage,
'https?://.*/viewtopic.php\?.*': TopicPage,
'https?://.*/ucp.php\?mode=login.*': LoginPage,
}
last_board_msg_id = None
def __init__(self, url, *args, **kwargs):
self.url = url
v = urlsplit(url)
self.PROTOCOL = v.scheme
self.DOMAIN = v.netloc
self.BASEPATH = v.path[:v.path.rfind('/')]
BaseBrowser.__init__(self, *args, **kwargs)
def home(self):
self.location(self.url)
def is_logged(self):
return not self.page or self.page.is_logged()
def login(self):
data = {'login': 'Connexion',
'username': self.username,
'password': self.password,
}
self.location('%s/ucp.php?mode=login' % self.BASEPATH, urllib.urlencode(data), no_login=True)
assert self.is_on_page(LoginPage)
if not self.page.is_logged():
raise BrowserIncorrectPassword(self.page.get_error_message())
def get_root_feed_url(self):
self.home()
return self.page.get_feed_url()
def iter_links(self, url):
if url:
self.location(url)
else:
self.home()
assert self.is_on_page(ForumPage)
return self.page.iter_links()
def iter_posts(self, id, stop_id=None):
if id.startswith('http'):
self.location(id)
else:
self.location('%s/%s' % (self.BASEPATH, id2url(id)))
assert self.is_on_page(TopicPage)
parent = 0
while 1:
for post in self.page.iter_posts():
if post.id == stop_id:
return
post.parent = parent
yield post
parent = post.id
if self.page.cur_page == self.page.tot_pages:
return
self.location(self.page.next_page_url())
def riter_posts(self, id, stop_id=None):
if id.startswith('http'):
self.location(id)
else:
self.location('%s/%s' % (self.BASEPATH, id2url(id)))
assert self.is_on_page(TopicPage)
child = None
while 1:
for post in self.page.riter_posts():
if child:
child.parent = post.id
yield child
if post.id == stop_id:
return
child = post
if self.page.cur_page == 1:
if child:
yield child
return
self.location(self.page.prev_page_url())
def get_post(self, id):
if id.startswith('http'):
self.location(id)
id = url2id(id)
else:
self.location('%s/%s' % (self.BASEPATH, id2url(id)))
assert self.is_on_page(TopicPage)
post = self.page.get_post(int(id.split('.')[-1]))
if not post:
return None
if post.parent == 0 and self.page.cur_page > 1:
self.location(self.page.prev_page_url())
post.parent = self.page.get_last_post_id()
return post

View file

View file

@ -0,0 +1,134 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from urlparse import urlsplit, parse_qs
from datetime import datetime
from .index import PhpBBPage
from ..tools import parse_date
__all__ = ['Link', 'ForumPage', 'TopicPage']
class Link(object):
(FORUM,
TOPIC) = xrange(2)
def __init__(self, type, url):
self.type = type
self.url = url
self.title = u''
self.date = None
self.nb_messages = 0
class ForumPage(PhpBBPage):
def iter_links(self):
for li in self.parser.select(self.document.getroot(), 'ul.forums li.row'):
title = li.cssselect('a.forumtitle')[0]
link = Link(Link.FORUM, title.attrib['href'])
link.title = title.text.strip()
yield link
for li in self.parser.select(self.document.getroot(), 'ul.topics li.row'):
title = li.cssselect('a.topictitle')[0]
link = Link(Link.TOPIC, title.attrib['href'])
link.title = title.text.strip()
link.date = parse_date(li.find('dl').find('dt').findall('a')[-1].tail.strip(u'» \r\n'))
# it only lists number of answers, so we add 1.
link.nb_messages = int(li.cssselect('dd.posts')[0].text.strip()) + 1
yield link
class Post(object):
def __init__(self, topic, id):
self.id = int(id)
self.topic = topic
self.title = u''
self.author = u''
self.date = None
self.content = u''
self.signature = u''
self.parent = 0
class TopicPage(PhpBBPage):
def on_loaded(self):
div = self.document.getroot().cssselect('div.pagination')[0]
strongs = div.cssselect('strong')
self.cur_page = int(strongs[0].text.strip())
self.tot_pages = int(strongs[1].text.strip())
v = urlsplit(self.url)
args = parse_qs(v.query)
self.topic_id = int(args['t'][0])
nav = self.parser.select(self.document.getroot(), 'li.icon-home', 1)
self.forum_title = u''
for a in nav.findall('a')[1:]:
text = a.text.strip()
if len(text) >= 20:
text = text[:20] + u''
self.forum_title = '[%s]' % text
def next_page_url(self):
return self.document.getroot().cssselect('a.right-box')[0].attrib['href']
def prev_page_url(self):
return self.document.getroot().cssselect('a.left-box')[0].attrib['href']
def iter_posts(self):
for div in self.parser.select(self.document.getroot(), 'div.post'):
yield self._get_post(div)
def riter_posts(self):
for div in reversed(self.parser.select(self.document.getroot(), 'div.post')):
yield self._get_post(div)
def get_post(self, id):
parent = 0
for div in self.parser.select(self.document.getroot(), 'div.post'):
if div.attrib['id'] == 'p%d' % id:
post = self._get_post(div)
post.parent = parent
return post
else:
parent = int(div.attrib['id'][1:])
def _get_post(self, div):
body = div.cssselect('div.postbody')[0]
profile = div.cssselect('dl.postprofile')[0]
id = div.attrib['id'][1:]
post = Post(self.topic_id, id)
post.title = '%s %s' % (self.forum_title, body.cssselect('h3 a')[0].text.strip())
post.author = profile.cssselect('dt a')[-1].text.strip()
post.date = parse_date(body.cssselect('p.author')[0].find('strong').tail.strip(u'» \n\r'))
post.content = self.parser.tostring(body.cssselect('div.content')[0])
signature = body.cssselect('div.signature')
if len(signature) > 0:
post.signature = self.parser.tostring(signature[0])
return post
def get_last_post_id(self):
id = 0
for div in self.parser.select(self.document.getroot(), 'div.post'):
id = int(div.attrib['id'][1:])
return id

View file

@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.browser import BasePage
class PhpBBPage(BasePage):
def is_logged(self):
return len(self.document.getroot().cssselect('li.icon-register')) == 0
def get_feed_url(self):
links = self.document.getroot().cssselect('link[type="application/atom+xml"]')
return links[-1].attrib['href']
class LoginPage(PhpBBPage):
def get_error_message(self):
errors = []
for div in self.parser.select(self.document.getroot(), 'div.error'):
if div.text:
errors.append(div.text.strip())
return ', '.join(errors)

View file

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.test import BackendTest
__all__ = ['PhpBBTest']
class PhpBBTest(BackendTest):
BACKEND = 'phpbb'
def test_phpbb(self):
for thread in self.backend.iter_threads():
pass

View file

@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from dateutil.parser import parse as _parse_dt
from urlparse import urlsplit, parse_qs
from weboob.tools.misc import local2utc
def url2id(url):
v = urlsplit(url)
pagename = v.path.split('/')[-1]
args = parse_qs(v.query)
if pagename == 'viewtopic.php':
s = '%d' % int(args['t'][0])
if 'p' in args:
s += '.%d' % int(args['p'][0])
return s
return None
def id2url(id):
v = id.split('.')
if len(v) == 1:
return 'viewtopic.php?t=%d' % int(v[0])
if len(v) == 2:
return 'viewtopic.php?t=%d&p=%d#p%d' % (int(v[0]),
int(v[1]),
int(v[1]))
def rssid(id):
return id
def parse_date(s):
s = s.replace(u'Fév', 'Feb') \
.replace(u'Avr', 'Apr') \
.replace(u'Mai', 'May') \
.replace(u'Juin', 'Jun') \
.replace(u'Juil', 'Jul') \
.replace(u'Aoû', 'Aug') \
.replace(u'Déc', 'Dec')
return local2utc(_parse_dt(s))