new messages module: ovs

This commit is contained in:
Vincent A 2013-08-27 22:58:52 +02:00 committed by Florent
commit 46f61b3516
7 changed files with 556 additions and 0 deletions

24
modules/ovs/__init__.py Normal file
View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2013 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from .backend import OvsBackend
__all__ = ['OvsBackend']

104
modules/ovs/backend.py Normal file
View file

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2013 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.backend import BaseBackend, BackendConfig
from weboob.tools.browser import BrowserForbidden
from weboob.tools.value import Value, ValueBackendPassword
from weboob.capabilities.messages import ICapMessages, ICapMessagesPost, Message
from .browser import OvsBrowser
__all__ = ['OvsBackend']
class OvsBackend(BaseBackend, ICapMessages, ICapMessagesPost):
NAME = 'ovs'
DESCRIPTION = u'OnVaSortir website. Handles private messages only'
MAINTAINER = u'Vincent A'
EMAIL = 'dev@indigo.re'
VERSION = '0.h'
CONFIG = BackendConfig(Value('username', label='Username', default=''),
ValueBackendPassword('password', label='Password', default=''),
Value('city', label='City (subdomain)', default='paris'))
# TODO keep list of possible cities
BROWSER = OvsBrowser
STORAGE = {'seen': {}}
def create_default_browser(self):
return self.create_browser(self.config['city'].get(),
self.config['username'].get(),
self.config['password'].get(),
parser='raw')
def iter_threads(self):
with self.browser:
for thread in self.browser.iter_threads_list():
yield thread
def get_thread(self, id):
with self.browser:
thread = self.browser.get_thread(id)
messages = [thread.root] + thread.root.children
for message in messages:
if not self.storage.get('seen', message.full_id, default=False):
message.flags |= Message.IS_UNREAD
return thread
def iter_unread_messages(self):
with self.browser:
for thread in self.iter_threads():
# TODO reuse thread object?
thread2 = self.get_thread(thread.id)
messages = [thread2.root] + thread2.root.children
for message in messages:
if message.flags & Message.IS_UNREAD:
yield message
# TODO implement more efficiently by having a "last weboob seen" for
# a thread and query a thread only if "last activity" returned by web
# is later than "last weboob seen"
def set_message_read(self, message):
self.storage.set('seen', message.full_id, True)
self.storage.save()
def post_message(self, message):
if not self.browser.username:
raise BrowserForbidden()
with self.browser:
thread = message.thread
if message.parent:
# ovs.<threadid>@*
self.browser.post_to_thread(thread.id, message.title, message.content)
else:
# ovs.<recipient>@*
self.browser.create_thread(thread.id, message.title, message.content)
# FIXME known bug: parsing is done in "boosted mode" which is automatically disable after some time, the "boosted mode" should be re-toggled often
# TODO support outing comments, forum messages
# TODO make an ICapOuting?

102
modules/ovs/browser.py Normal file
View file

@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2013 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.browser import BaseBrowser, BrowserIncorrectPassword
from weboob.tools.parsers.iparser import IParser
import BeautifulSoup
from .pages import PagePrivateThreadsList, PagePrivateThread, PageLogin, PageIndex, DummyPage, PageUserProfile
__all__ = ['OvsBrowser']
class SoupParser(IParser):
def parse(self, data, encoding=None):
return BeautifulSoup.BeautifulSoup(data.read().decode(encoding or 'utf-8'), convertEntities=BeautifulSoup.BeautifulStoneSoup.ALL_ENTITIES)
class OvsBrowser(BaseBrowser):
PROTOCOL = 'http'
DOMAIN = 'paris.onvasortir.com'
ENCODING = 'cp1252'
def __init__(self, city, username, password, *a, **kw):
self.DOMAIN = '%s.onvasortir.com' % city
self.PAGES = {
'%s://%s/' % (self.PROTOCOL, self.DOMAIN): PageIndex,
r'%s://%s/message_read.php\?Id=.+' % (self.PROTOCOL, self.DOMAIN): PagePrivateThread,
'%s://%s/vue_messages_recus.php' % (self.PROTOCOL, self.DOMAIN): PagePrivateThreadsList,
'%s://%s/vue_messages_envoyes.php' % (self.PROTOCOL, self.DOMAIN): PagePrivateThreadsList,
'%s://%s/page_action_connect.php' % (self.PROTOCOL, self.DOMAIN): PageLogin,
r'%s://%s/\?Langue=EN' % (self.PROTOCOL, self.DOMAIN): DummyPage,
'%s://%s/page_action_boost.php' % (self.PROTOCOL, self.DOMAIN): DummyPage,
'%s://%s/vue_profil_all.php.php' % (self.PROTOCOL, self.DOMAIN): DummyPage,
r'%s://%s/message_msg_envoi_ok.php\?.*' % (self.PROTOCOL, self.DOMAIN): DummyPage,
'%s://%s/message_action_envoi.php' % (self.PROTOCOL, self.DOMAIN): DummyPage,
r'%s://%s/profil_read.php\?.+' % (self.PROTOCOL, self.DOMAIN): PageUserProfile,
}
kw['parser'] = SoupParser()
BaseBrowser.__init__(self, username, password, *a, **kw)
def iter_threads_list(self):
self.location('/vue_messages_recus.php')
assert self.is_on_page(PagePrivateThreadsList)
for thread in self.page.iter_threads_list():
yield thread
self.location('/vue_messages_envoyes.php')
assert self.is_on_page(PagePrivateThreadsList)
for thread in self.page.iter_threads_list():
yield thread
def get_thread(self, _id):
self.location('/message_read.php?Id=%s&AffMsg=all' % _id)
assert self.is_on_page(PagePrivateThread)
return self.page.get_thread(_id)
def login(self):
assert not self.is_logged()
self.page.login(self.username, self.password)
if not self.is_logged():
raise BrowserIncorrectPassword()
self.location('/?Langue=EN')
self.location('/page_action_boost.php')
self.location('/')
def is_logged(self):
return (self.is_on_page(DummyPage) or self.page.is_logged())
def post_to_thread(self, thread_id, subject, body):
self.location('/message_read.php?Id=%s' % thread_id.encode(self.ENCODING)) # FIXME
assert self.is_on_page(PagePrivateThread)
self.page.post_to_thread(thread_id, subject, body)
def create_thread(self, recipient, subject, body):
self.location('/profil_read.php?%s' % recipient.encode(self.ENCODING)) # FIXME
assert self.is_on_page(PageUserProfile)
self.page.create_thread(recipient, subject, body)

BIN
modules/ovs/favicon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.4 KiB

121
modules/ovs/ovsparse.py Normal file
View file

@ -0,0 +1,121 @@
#!/usr/bin/env python
# Copyright(C) 2013 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import BeautifulSoup
import datetime
def nearest_parent(node, expected):
return node.findParent(expected)
while node and node.name != expected:
node = node.parent
return node
def all_text_recursive(node):
return ''.join(node.findAll(text=True))
def all_next_siblings(node):
ret = []
while node:
ret.append(node)
node = node.nextSibling
return ret
def parse_date_from_site(sitedate):
parts = sitedate.split() # [d, m, Y, '-', 'H:M:S']
if len(parts[0]) == 1:
parts[0] = '0%s' % parts[0]
months = {'january': '01', 'february': '02', 'march': '03', 'april': '04', 'may': '05', 'june': '06', 'july': '07', 'august': '08', 'september': '09', 'october': '10', 'november': '11', 'december': '12'}
parts[1] = months[parts[1].lower()]
del parts[3]
return datetime.datetime.strptime(' '.join(parts), '%d %m %Y %H:%M:%S')
def image_to_text(src):
smileys = {'chat/e/grin.gif': ':D',
'chat/e/unsure.gif': ':s',
'chat/e/smile.gif': ':)',
'chat/e/shocked.gif': ':|',
'chat/e/sad.gif': ':(',
'chat/e/huh.gif': ':h',
'chat/e/suprised.gif': ':o',
'chat/e/cool.gif': 'B)',
'chat/e/redface.gif': ':red',
'chat/e/confused.gif': ':=',
'chat/e/razz.gif': ':p',
'chat/e/wink.gif': ';)',
'chat/e/mad.gif': ':x',
'chat/e/rolleyes.gif': ':b',
'chat/e/lol.gif': ':lol',
'chat/e/wub.gif': ':$',
'chat/e/bouche.gif': ':K',
'chat/e/sick.gif': '+o('}
return smileys.get(src)
def html_message_to_text(nodes):
parts = []
for node in nodes:
if isinstance(node, BeautifulSoup.NavigableString):
parts.append(unicode(node).replace('\r', ''))
elif node.name == 'img':
parts.append(image_to_text(node['src']))
elif node.name == 'a':
parts.append(node['href'])
elif node.name == 'br':
parts.append('\n')
else:
assert not ('%s not supported' % node.name)
return ''.join(parts)
def create_unique_id(proposed_id, used_ids):
if proposed_id not in used_ids:
return proposed_id
def make_id(base, index):
return '%s-%s' % (base, index)
index = 1
while make_id(proposed_id, index) in used_ids:
index += 1
return make_id(proposed_id, index)
# public
def private_message_form_fields(document):
ret = {}
form = document.find('form', attrs={'name': 'envoimail'})
def set_if_present(name):
item = form.find('input', attrs={'name': name})
if item:
ret[name] = item['value']
set_if_present('Pere')
set_if_present('Sortie')
set_if_present('Dest')
set_if_present('Titre')
return ret
def is_logged(document):
return (not document.find('form', attrs={'name': 'connection'}))

177
modules/ovs/pages.py Normal file
View file

@ -0,0 +1,177 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2013 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
import datetime
import re
import urllib
from weboob.tools.browser import BasePage
from weboob.capabilities.messages import Message, Thread
import ovsparse
__all__ = ['PagePrivateThreadsList', 'PagePrivateThread', 'PageLogin', 'PageIndex', 'DummyPage', 'PagePostMessage', 'PageUserProfile']
class OvsPage(BasePage):
def is_logged(self):
return ovsparse.is_logged(self.document)
def login(self, username, password):
self.browser.select_form(name='connection')
self.browser['Pseudo'] = username.encode(self.browser.ENCODING)
self.browser['Password'] = password.encode(self.browser.ENCODING)
self.browser['Retenir'] = ['ok']
self.browser.submit(nologin=True)
class PagePrivateThreadsList(OvsPage):
def iter_threads_list(self):
# site is sorted from latest to oldest
for message_a in reversed(self.document.findAll('a', href=re.compile(r'message_read.php\?'))):
ovs_id = re.search(r'Id=(\d+)', message_a["href"]).group(1)
id_ = ovs_id
thread = Thread(id_)
thread.title = ovsparse.all_text_recursive(message_a)
thread.flags = Thread.IS_DISCUSSION
#~ parent_tr = message_a.findParent('tr')
#~ username = all_text_recursive(parent_tr.find('a', href=re.compile(r'profil_read.php\?.*')))
#~ notread_self = (parent_tr.get('class') == 'newmails')
#~ notread_other = (parent_tr.find('span', **{'class': 'new_sortiepartenaire'}) is not None)
yield thread
class PagePrivateThread(OvsPage):
def get_thread(self, _id):
thread = Thread(_id)
thread.title = self.document.find('div', 'PADtitreBlanc_txt').find('center').string
thread.flags = Thread.IS_DISCUSSION
root = True
for message in self._get_messages(thread):
if root:
message.children = []
thread.root = message
thread.date = message.date
message.title = thread.title
root = False
else:
message.title = 'Re: %s' % thread.title
message.children = []
message.parent = thread.root
thread.root.children.append(message)
return thread
def _get_messages(self, thread):
thread_div = self.document.find(True, 'PADpost_txt')
used_ids = set()
rcpt = self.document.find('input', attrs={'type': 'hidden', 'name': 'Dest'})['value']
sender_to_receiver = {rcpt: self.browser.username, self.browser.username: rcpt}
# site is sorted from latest to oldest message
for message_table in reversed(thread_div.findAll('table')):
for td in message_table.findAll('td'):
profile_a = td.find('a', href=re.compile(r'profil_read.php\?.*'))
if not profile_a:
continue
first_br = td.find('br')
assert first_br.nextSibling.name == 'br'
text_nodes = ovsparse.all_next_siblings(first_br.nextSibling.nextSibling) # TODO
#~ print text_nodes
# date will be used as id
sitedate = profile_a.findParent('div').find(text=re.compile(',.*')).replace(', ', '')
sysdate = ovsparse.parse_date_from_site(sitedate)
compactdate = datetime.datetime.strftime(sysdate, '%Y%m%dT%H%M%S')
# but make it unique
msg_id = ovsparse.create_unique_id(compactdate, used_ids)
used_ids.add(msg_id)
message = Message(thread, msg_id)
message.sender = re.search(r'\?(.+)', profile_a['href']).group(1)
message.receivers = [sender_to_receiver[message.sender]]
message.date = sysdate
message.content = ovsparse.html_message_to_text(text_nodes)
notread_self = bool(td.find('span', 'ColorSurligne'))
notread_other = bool(td.find('span', 'new_sortiepartenaire'))
if notread_other or notread_self:
message.flags |= Message.IS_NOT_RECEIVED
else:
message.flags |= Message.IS_RECEIVED
yield message
def post_to_thread(self, thread_id, subject, body):
form = ovsparse.private_message_form_fields(self.document)
recode_dict(form, self.browser.ENCODING)
form['Message'] = body.encode(self.browser.ENCODING)
self.browser.location('/message_action_envoi.php', urllib.urlencode(form))
# html code is so broken that mechanize won't parse the forms
#~ self.browser.select_form('envoimail')
#~ self.browser['Message'] = body.encode(self.browser.ENCODING)
#~ self.browser['Pere'] = thread_id.encode(self.browser.ENCODING)
#~ self.browser['Titre'] = subject.encode(self.browser.ENCODING)
#~ self.browser.submit()
class PageLogin(BasePage):
pass
class PageIndex(OvsPage):
pass
class DummyPage(BasePage):
pass
class PagePostMessage(OvsPage):
pass
class PageUserProfile(OvsPage):
def create_thread(self, recipient, subject, body):
form = ovsparse.private_message_form_fields(self.document)
recode_dict(form, self.browser.ENCODING)
form['Message'] = body.encode(self.browser.ENCODING)
form['Titre'] = subject.encode(self.browser.ENCODING)
self.browser.location('/message_action_envoi.php', urllib.urlencode(form))
#~ self.browser.select_form('envoimail')
#~ self.browser['Titre'] = subject.encode(self.browser.ENCODING)
#~ self.browser['Message'] = body.encode(self.browser.ENCODING)
#~ self.browser.submit()
def recode_dict(dict_, encoding):
for k in dict_:
dict_[k] = dict_[k].encode(encoding)

28
modules/ovs/test.py Normal file
View file

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# Copyright(C) 2013 Vincent A
#
# This file is part of weboob.
#
# weboob is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# weboob is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with weboob. If not, see <http://www.gnu.org/licenses/>.
from weboob.tools.test import BackendTest
class OvsTest(BackendTest):
BACKEND = 'ovs'
def test_ovs(self):
raise NotImplementedError()