support repositories to manage backends (closes #747)

This commit is contained in:
Romain Bignon 2012-01-03 12:10:21 +01:00
commit 14a7a1d362
410 changed files with 1079 additions and 297 deletions

24
modules/dlfp/__init__.py Normal file
View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .browser import DLFP
from .backend import DLFPBackend
__all__ = ['DLFP', 'DLFPBackend']

213
modules/dlfp/backend.py Normal file
View file

@ -0,0 +1,213 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from __future__ import with_statement
from datetime import datetime, timedelta
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.tools.newsfeed import Newsfeed
from weboob.tools.value import Value, ValueBool, ValueBackendPassword
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Message, Thread, CantSendMessage
from weboob.capabilities.content import ICapContent, Content
from .browser import DLFP
from .tools import rssid, id2url
__all__ = ['DLFPBackend']
class DLFPBackend(BaseBackend, ICapMessages, ICapMessagesPost, ICapContent):
NAME = 'dlfp'
MAINTAINER = 'Romain Bignon'
EMAIL = 'romain@weboob.org'
VERSION = '0.a'
LICENSE = 'AGPLv3+'
DESCRIPTION = "Da Linux French Page"
CONFIG = BackendConfig(Value('username', label='Username', regexp='.+'),
ValueBackendPassword('password', label='Password'),
ValueBool('get_news', label='Get newspapers', default=True),
ValueBool('get_diaries', label='Get diaries', default=False),
ValueBool('get_polls', label='Get polls', default=False),
ValueBool('get_board', label='Get board', default=False),
ValueBool('get_wiki', label='Get wiki', default=False),
ValueBool('get_tracker', label='Get tracker', default=False))
STORAGE = {'seen': {}}
BROWSER = DLFP
FEEDS = {'get_news': "https://linuxfr.org/news.atom",
'get_diaries': "https://linuxfr.org/journaux.atom",
'get_polls': "https://linuxfr.org/sondages.atom",
'get_board': "https://linuxfr.org/forums.atom",
'get_wiki': "https://linuxfr.org/wiki.atom",
'get_tracker': "https://linuxfr.org/suivi.atom",
}
def create_default_browser(self):
return self.create_browser(self.config['username'].get(), self.config['password'].get())
def deinit(self):
# don't need to logout if the browser hasn't been used.
if not self._browser:
return
with self.browser:
self.browser.close_session()
#### ICapMessages ##############################################
def iter_threads(self):
whats = set()
for param, url in self.FEEDS.iteritems():
if self.config[param].get():
whats.add(url)
for what in whats:
for article in Newsfeed(what, rssid).iter_entries():
if article.datetime and (datetime.now() - article.datetime) > timedelta(days=60):
continue
thread = Thread(article.id)
thread.title = article.title
if article.datetime:
thread.date = article.datetime
yield thread
def get_thread(self, id):
if isinstance(id, Thread):
thread = id
id = thread.id
else:
thread = None
with self.browser:
content = self.browser.get_content(id)
if not content:
return None
if not thread:
thread = Thread(content.id)
flags = Message.IS_HTML
if not thread.id in self.storage.get('seen', default={}):
flags |= Message.IS_UNREAD
thread.title = content.title
if not thread.date:
thread.date = content.date
thread.root = Message(thread=thread,
id=0, # root message
title=content.title,
sender=content.author or u'',
receivers=None,
date=thread.date,
parent=None,
content=content.body,
signature='URL: %s' % self.browser.absurl(id2url(content.id)),
children=[],
flags=flags)
for com in content.comments:
self._insert_comment(com, thread.root)
return thread
def _insert_comment(self, com, parent):
""""
Insert 'com' comment and its children in the parent message.
"""
flags = Message.IS_HTML
if not com.id in self.storage.get('seen', parent.thread.id, 'comments', default=[]):
flags |= Message.IS_UNREAD
message = Message(thread=parent.thread,
id=com.id,
title=com.title,
sender=com.author or u'',
receivers=None,
date=com.date,
parent=parent,
content=com.body,
signature=com.signature + \
'<br />'.join(['Score: %d' % com.score,
'URL: %s' % com.url]),
children=[],
flags=flags)
parent.children.append(message)
for sub in com.comments:
self._insert_comment(sub, message)
def iter_unread_messages(self, thread=None):
for thread in self.iter_threads():
self.fill_thread(thread, 'root')
for m in thread.iter_all_messages():
if m.flags & m.IS_UNREAD:
yield m
def set_message_read(self, message):
self.storage.set('seen', message.thread.id, 'comments',
self.storage.get('seen', message.thread.id, 'comments', default=[]) + [message.id])
self.storage.save()
def fill_thread(self, thread, fields):
return self.get_thread(thread)
#### ICapMessagesReply #########################################
def post_message(self, message):
if not message.parent:
raise CantSendMessage('Posting news and diaries on DLFP is not supported yet')
assert message.thread
with self.browser:
return self.browser.post_comment(message.thread.id,
message.parent.id,
message.title,
message.content)
#### ICapContent ###############################################
def get_content(self, id):
if isinstance(id, basestring):
content = Content(id)
else:
content = id
id = content.id
with self.browser:
data = self.browser.get_wiki_content(id)
if data is None:
return None
content.content = data
return content
def push_content(self, content, message=None, minor=False):
with self.browser:
return self.browser.set_wiki_content(content.id, content.content, message)
def get_content_preview(self, content):
with self.browser:
return self.browser.get_wiki_preview(content.id, content.content)
OBJECTS = {Thread: fill_thread}

251
modules/dlfp/browser.py Normal file
View file

@ -0,0 +1,251 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import urllib
import re
from weboob.tools.browser import BaseBrowser, BrowserHTTPNotFound, BrowserHTTPError, BrowserIncorrectPassword
from weboob.capabilities.messages import CantSendMessage
from .pages.index import IndexPage, LoginPage
from .pages.news import ContentPage, NewCommentPage, NodePage, CommentPage, NewTagPage
from .pages.board import BoardIndexPage
from .pages.wiki import WikiEditPage
from .tools import id2url, url2id
# Browser
class DLFP(BaseBrowser):
DOMAIN = 'linuxfr.org'
PROTOCOL = 'https'
PAGES = {'https?://.*linuxfr.org/?': IndexPage,
'https?://.*linuxfr.org/compte/connexion': LoginPage,
'https?://.*linuxfr.org/news/[^\.]+': ContentPage,
'https?://.*linuxfr.org/wiki/(?!nouveau)[^/]+': ContentPage,
'https?://.*linuxfr.org/wiki': WikiEditPage,
'https?://.*linuxfr.org/wiki/nouveau': WikiEditPage,
'https?://.*linuxfr.org/wiki/[^\.]+/modifier': WikiEditPage,
'https?://.*linuxfr.org/suivi/[^\.]+': ContentPage,
'https?://.*linuxfr.org/sondages/[^\.]+': ContentPage,
'https?://.*linuxfr.org/users/[^\./]+/journaux/[^\.]+': ContentPage,
'https?://.*linuxfr.org/forums/[^\./]+/posts/[^\.]+': ContentPage,
'https?://.*linuxfr.org/nodes/(\d+)/comments/(\d+)': CommentPage,
'https?://.*linuxfr.org/nodes/(\d+)/comments/nouveau': NewCommentPage,
'https?://.*linuxfr.org/nodes/(\d+)/comments': NodePage,
'https?://.*linuxfr.org/nodes/(\d+)/tags/nouveau': NewTagPage,
'https?://.*linuxfr.org/board/index.xml': BoardIndexPage,
}
last_board_msg_id = None
def parse_id(self, _id):
if re.match('^https?://.*linuxfr.org/nodes/\d+/comments/\d+$', _id):
return _id, None
url = id2url(_id)
if url is None:
if url2id(_id) is not None:
url = _id
_id = url2id(url)
else:
return None, None
return url, _id
def get_wiki_content(self, _id):
url, _id = self.parse_id('W.%s' % _id)
if url is None:
return None
try:
self.location('%s/modifier' % url)
except BrowserHTTPNotFound:
return ''
assert self.is_on_page(WikiEditPage)
return self.page.get_body()
def _go_on_wiki_edit_page(self, name):
"""
Go on the wiki page named 'name'.
Return True if this is a new page, or False if
the page already exist.
Return None if it isn't a right wiki page name.
"""
url, _id = self.parse_id('W.%s' % name)
if url is None:
return None
try:
self.location('%s/modifier' % url)
except BrowserHTTPNotFound:
self.location('/wiki/nouveau')
new = True
else:
new = False
assert self.is_on_page(WikiEditPage)
return new
def set_wiki_content(self, name, content, message):
new = self._go_on_wiki_edit_page(name)
if new is None:
return None
if new:
title = name.replace('-', ' ')
else:
title = None
self.page.post_content(title, content, message)
def get_wiki_preview(self, name, content):
if self._go_on_wiki_edit_page(name) is None:
return None
self.page.post_preview(content)
if self.is_on_page(WikiEditPage):
return self.page.get_preview_html()
elif self.is_on_page(ContentPage):
return self.page.get_article().body
def get_content(self, _id):
url, _id = self.parse_id(_id)
if url is None:
return None
self.location(url)
self.page.url = self.absurl(url)
if self.is_on_page(CommentPage):
content = self.page.get_comment()
elif self.is_on_page(ContentPage):
m = re.match('.*#comment-(\d+)$', url)
if m:
content = self.page.get_comment(int(m.group(1)))
else:
content = self.page.get_article()
if _id is not None:
content.id = _id
return content
def _is_comment_submit_form(self, form):
return 'comment_new' in form.action
def post_comment(self, thread, reply_id, title, message):
url = id2url(thread)
if url is None:
raise CantSendMessage('%s is not a right ID' % thread)
self.location(url)
assert self.is_on_page(ContentPage)
self.location(self.page.get_post_comment_url())
assert self.is_on_page(NewCommentPage)
self.select_form(predicate=self._is_comment_submit_form)
self.set_all_readonly(False)
if title is not None:
self['comment[title]'] = title.encode('utf-8')
self['comment[wiki_body]'] = message.encode('utf-8')
if int(reply_id) > 0:
self['comment[parent_id]'] = str(reply_id)
self['commit'] = 'Poster le commentaire'
try:
self.submit()
except BrowserHTTPError, e:
raise CantSendMessage('Unable to send message to %s.%s: %s' % (thread, reply_id, e))
if self.is_on_page(NodePage):
errors = self.page.get_errors()
if len(errors) > 0:
raise CantSendMessage('Unable to send message: %s' % ', '.join(errors))
return None
def login(self):
# not usefull for the moment
#self.location('/', no_login=True)
data = {'account[login]': self.username,
'account[password]': self.password,
'account[remember_me]': 1,
#'authenticity_token': self.page.get_login_token(),
}
self.location('/compte/connexion', urllib.urlencode(data), no_login=True)
if not self.is_logged():
raise BrowserIncorrectPassword()
def is_logged(self):
return (self.page and self.page.is_logged())
def close_session(self):
self.openurl('/compte/deconnexion', {})
def plusse(self, url):
return self.relevance(url, 'for')
def moinse(self, url):
return self.relevance(url, 'against')
def relevance(self, url, what):
comment = self.get_content(url)
if comment is None:
raise ValueError('The given URL isn\'t a comment.')
if comment.relevance_token is None:
return False
res = self.readurl('%s%s' % (comment.relevance_url, what),
urllib.urlencode({'authenticity_token': comment.relevance_token}))
return res
def iter_new_board_messages(self):
self.location('/board/index.xml')
assert self.is_on_page(BoardIndexPage)
msgs = self.page.get_messages(self.last_board_msg_id)
for msg in reversed(msgs):
self.last_board_msg_id = msg.id
yield msg
def board_post(self, msg):
request = self.request_class(self.absurl('/board/'),
urllib.urlencode({'board[message]': msg}),
{'Referer': self.absurl('/')})
self.readurl(request)
def add_tag(self, _id, tag):
url, _id = self.parse_id(_id)
if url is None:
return None
self.location(url)
assert self.is_on_page(ContentPage)
self.location(self.page.get_tag_url())
assert self.is_on_page(NewTagPage)
self.page.tag(tag)

BIN
modules/dlfp/favicon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5 KiB

View file

View file

@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
from logging import warning
from weboob.tools.browser import BasePage
class Message(object):
TIMESTAMP_REGEXP = re.compile(r'(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})')
def __init__(self, id, timestamp, login, message, is_me):
self.id = id
self.timestamp = timestamp
self.login = login
self.message = message
self.is_me = is_me
self.norloge = timestamp
m = self.TIMESTAMP_REGEXP.match(timestamp)
if m:
self.norloge = '%02d:%02d:%02d' % (int(m.group(4)),
int(m.group(5)),
int(m.group(6)))
else:
warning('Unable to parse timestamp "%s"' % timestamp)
class BoardIndexPage(BasePage):
def is_logged(self):
return True
def get_messages(self, last=None):
msgs = []
for post in self.parser.select(self.document.getroot(), 'post'):
m = Message(int(post.attrib['id']),
post.attrib['time'],
post.find('login').text,
post.find('message').text,
post.find('login').text.lower() == self.browser.username.lower())
if last is not None and last == m.id:
break
msgs.append(m)
return msgs

View file

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.browser import BasePage
class DLFPPage(BasePage):
def is_logged(self):
for form in self.document.getiterator('form'):
if form.attrib.get('id', None) == 'new_account_sidebar':
return False
return True
class IndexPage(DLFPPage):
def get_login_token(self):
form = self.parser.select(self.document.getroot(), 'form#new_account_sidebar', 1)
for i in form.find('div').getiterator('input'):
if i.attrib['name'] == 'authenticity_token':
return i.attrib['value']
class LoginPage(DLFPPage):
pass

211
modules/dlfp/pages/news.py Normal file
View file

@ -0,0 +1,211 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from weboob.tools.browser import BrokenPageError
from weboob.tools.misc import local2utc
from dlfp.tools import url2id
from .index import DLFPPage
class Content(object):
TAGGABLE = False
def __init__(self, browser):
self.browser = browser
self.url = u''
self.id = u''
self.title = u''
self.author = u''
self.username = u''
self.body = u''
self.date = None
self.score = 0
self.comments = []
self.relevance_url = None
self.relevance_token = None
def is_taggable(self):
return False
class Comment(Content):
def __init__(self, article, div, reply_id):
Content.__init__(self, article.browser)
self.reply_id = reply_id
self.signature = u''
self.id = div.attrib['id'].split('-')[1]
self.url = '%s#%s' % (article.url, div.attrib['id'])
self.title = unicode(self.browser.parser.select(div.find('h2'), 'a.title', 1).text)
try:
a = self.browser.parser.select(div.find('p'), 'a[rel=author]', 1)
except BrokenPageError:
self.author = 'Anonyme'
self.username = None
else:
self.author = unicode(a.text)
self.username = unicode(a.attrib['href'].split('/')[2])
self.date = datetime.strptime(self.browser.parser.select(div.find('p'), 'time', 1).attrib['datetime'].split('+')[0],
'%Y-%m-%dT%H:%M:%S')
self.date = local2utc(self.date)
content = div.find('div')
try:
signature = self.browser.parser.select(content, 'p.signature', 1)
except BrokenPageError:
# No signature.
pass
else:
content.remove(signature)
self.signature = self.browser.parser.tostring(signature)
self.body = self.browser.parser.tostring(content)
self.score = int(self.browser.parser.select(div.find('p'), 'span.score', 1).text)
forms = self.browser.parser.select(div.find('footer'), 'form.button_to')
if len(forms) > 0:
self.relevance_url = forms[0].attrib['action'].rstrip('for').rstrip('against')
self.relevance_token = self.browser.parser.select(forms[0], 'input[name=authenticity_token]', 1).attrib['value']
subs = div.find('ul')
if subs is not None:
for sub in subs.findall('li'):
comment = Comment(article, sub, self.id)
self.comments.append(comment)
def iter_all_comments(self):
for comment in self.comments:
yield comment
for c in comment.iter_all_comments():
yield c
def __repr__(self):
return u"<Comment id=%r author=%r title=%r>" % (self.id, self.author, self.title)
class Article(Content):
TAGGABLE = True
def __init__(self, browser, url, tree):
Content.__init__(self, browser)
self.url = url
self.id = url2id(self.url)
if tree is None:
return
header = tree.find('header')
self.title = u''.join([a.text for a in header.find('h1').findall('a')])
try:
a = self.browser.parser.select(header, 'a[rel=author]', 1)
except BrokenPageError:
self.author = 'Anonyme'
self.username = None
else:
self.author = unicode(a.text)
self.username = unicode(a.attrib['href'].split('/')[2])
self.body = self.browser.parser.tostring(self.browser.parser.select(tree, 'div.content', 1))
try:
self.date = datetime.strptime(self.browser.parser.select(header, 'time', 1).attrib['datetime'].split('+')[0],
'%Y-%m-%dT%H:%M:%S')
self.date = local2utc(self.date)
except BrokenPageError:
pass
for form in self.browser.parser.select(tree.find('footer'), 'form.button_to'):
if form.attrib['action'].endswith('/for'):
self.relevance_url = form.attrib['action'].rstrip('for').rstrip('against')
self.relevance_token = self.browser.parser.select(form, 'input[name=authenticity_token]', 1).attrib['value']
self.score = int(self.browser.parser.select(tree, 'div.figures figure.score', 1).text)
def append_comment(self, comment):
self.comments.append(comment)
def iter_all_comments(self):
for comment in self.comments:
yield comment
for c in comment.iter_all_comments():
yield c
class CommentPage(DLFPPage):
def get_comment(self):
article = Article(self.browser, self.url, None)
return Comment(article, self.parser.select(self.document.getroot(), 'li.comment', 1), 0)
class ContentPage(DLFPPage):
def on_loaded(self):
self.article = None
def is_taggable(self):
return True
def get_comment(self, id):
article = Article(self.browser, self.url, None)
try:
li = self.parser.select(self.document.getroot(), 'li#comment-%s' % id, 1)
except BrokenPageError:
return None
else:
return Comment(article, li, 0)
def get_article(self):
if not self.article:
self.article = Article(self.browser,
self.url,
self.parser.select(self.document.getroot(), 'div#contents article', 1))
try:
threads = self.parser.select(self.document.getroot(), 'ul.threads', 1)
except BrokenPageError:
pass # no comments
else:
for comment in threads.findall('li'):
self.article.append_comment(Comment(self.article, comment, 0))
return self.article
def get_post_comment_url(self):
return self.parser.select(self.document.getroot(), 'p#send-comment', 1).find('a').attrib['href']
def get_tag_url(self):
return self.parser.select(self.document.getroot(), 'div.tag_in_place', 1).find('a').attrib['href']
class NewCommentPage(DLFPPage):
pass
class NewTagPage(DLFPPage):
def _is_tag_form(self, form):
return form.action.endswith('/tags')
def tag(self, tag):
self.browser.select_form(predicate=self._is_tag_form)
self.browser['tags'] = tag
self.browser.submit()
class NodePage(DLFPPage):
def get_errors(self):
try:
div = self.parser.select(self.document.getroot(), 'div.errors', 1)
except BrokenPageError:
return []
l = []
for li in div.find('ul').findall('li'):
l.append(li.text)
return l

View file

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.browser import BrokenPageError
from .index import DLFPPage
class WikiEditPage(DLFPPage):
def get_body(self):
try:
return self.parser.select(self.document.getroot(), 'textarea#wiki_page_wiki_body', 1).text
except BrokenPageError:
return ''
def _is_wiki_form(self, form):
return form.attrs.get('class', '') in ('new_wiki_page', 'edit_wiki_page')
def post_content(self, title, body, message):
self.browser.select_form(predicate=self._is_wiki_form)
self.browser.set_all_readonly(False)
if title is not None:
self.browser['wiki_page[title]'] = title.encode('utf-8')
self.browser['commit'] = 'Créer'
else:
self.browser['commit'] = 'Mettre à jour'
self.browser['wiki_page[wiki_body]'] = body.encode('utf-8')
if message is not None:
self.browser['wiki_page[message]'] = message.encode('utf-8')
self.browser.submit()
def post_preview(self, body):
self.browser.select_form(predicate=self._is_wiki_form)
self.browser['wiki_page[wiki_body]'] = body
self.browser.submit()
def get_preview_html(self):
body = self.parser.select(self.document.getroot(), 'article.wikipage div.content', 1)
return self.parser.tostring(body)

56
modules/dlfp/test.py Normal file
View file

@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime
from weboob.tools.test import BackendTest
from dlfp.browser import DLFP
__all__ = ['DLFPTest']
class DLFPTest(BackendTest):
BACKEND = 'dlfp'
def __init__(self, *args, **kwargs):
DLFP.DOMAIN = 'alpha.linuxfr.org'
BackendTest.__init__(self, *args, **kwargs)
def test_new_messages(self):
feeds = {}
for name, feed in self.backend.FEEDS.iteritems():
feeds[name] = feed.replace('//linuxfr.org', '//alpha.linuxfr.org')
self.backend.FEEDS = feeds
for message in self.backend.iter_unread_messages():
pass
def test_get_content(self):
self.backend.get_content(u"Ceci-est-un-test")
def test_push_content(self):
content = self.backend.get_content(u"Ceci-est-un-test")
content.content = "test "+str(datetime.now())
self.backend.push_content(content, message="test weboob", minor=True)
def test_content_preview(self):
content = self.backend.get_content(u"Ceci-est-un-test")
self.backend.get_content_preview(content)

81
modules/dlfp/tools.py Normal file
View file

@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2010-2011 Romain Bignon
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import re
RSSID_RE = re.compile('tag:.*:(\w+)/(\d+)')
ID2URL_RE = re.compile('^(\w)(.*)\.([^ \.]+)$')
REGEXPS = {'/users/%s/journaux/%s': 'D%s.%s',
'/news/%s': 'N.%s',
'/wiki/%s': 'W.%s',
'/suivi/%s': 'T.%s',
'/sondages/%s': 'P.%s',
'/forums/%s/posts/%s': 'B%s.%s',
}
def f2re(f):
return '.*' + f.replace('%s', '([^ /]+)')
def rssid(entry):
m = RSSID_RE.match(entry.id)
if not m:
return None
ind = m.group(1).replace('Post', 'Board')[0]
for url_re, id_re in REGEXPS.iteritems():
if id_re[0] != ind:
continue
if id_re.count('%s') == 2:
mm = re.match(f2re(url_re), entry.link)
if not mm:
return
return '%s%s.%s' % (ind, mm.group(1), m.group(2))
else:
return '%s.%s' % (ind, m.group(2))
def id2url(id):
m = ID2URL_RE.match(id)
if not m:
return None
for url_re, id_re in REGEXPS.iteritems():
if id_re[0] != m.group(1):
continue
if id_re.count('%s') == 2:
return url_re % (m.group(2), m.group(3))
else:
return url_re % m.group(3)
def url2id(url):
for url_re, id_re in REGEXPS.iteritems():
m = re.match(f2re(url_re), url)
if not m:
continue
return id_re % m.groups()
def id2threadid(id):
m = ID2URL_RE.match(id)
if m:
return m.group(3)